From b596984b44e3122713646327e4df157f7adae817 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Mon, 18 Oct 2021 13:42:32 -0300 Subject: [PATCH 01/21] Add ip_address option to setup() (#628) * Add ip_address option to setup() * Update README.md --- README.md | 7 +++++++ broadlink/__init__.py | 9 +++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 009315d1..d66ab643 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,13 @@ broadlink.setup('myssid', 'mynetworkpass', 3) Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) +#### Advanced options + +You may need to specify a broadcast address if setup is not working. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255') +``` + ### Discovery Use this function to discover devices: diff --git a/broadlink/__init__.py b/broadlink/__init__.py index fc64e844..5f715201 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -240,7 +240,12 @@ def xdiscover( # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid: str, password: str, security_mode: int) -> None: +def setup( + ssid: str, + password: str, + security_mode: int, + ip_address: str = DEFAULT_BCAST_ADDR, +) -> None: """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) @@ -269,5 +274,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, (DEFAULT_BCAST_ADDR, DEFAULT_PORT)) + sock.sendto(payload, (ip_address, DEFAULT_PORT)) sock.close() From 11febb043bf6cb574fcca29beef049f06713be20 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Mon, 18 Oct 2021 14:05:39 -0300 Subject: [PATCH 02/21] Improve README.md (#629) --- README.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d66ab643..06b7f9c3 100644 --- a/README.md +++ b/README.md @@ -67,17 +67,19 @@ devices = broadlink.discover() #### Advanced options You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. +Using the IP address of your local machine: ```python3 -devices = broadlink.discover(local_ip_address='192.168.0.100') # IP address of your local machine. +devices = broadlink.discover(local_ip_address='192.168.0.100') ``` +Using the broadcast address of your subnet: ```python3 -devices = broadlink.discover(discover_ip_address='192.168.0.255') # Broadcast address of your subnet. +devices = broadlink.discover(discover_ip_address='192.168.0.255') ``` If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: ```python3 -device = broadlink.hello('192.168.0.16') # IP address of your Broadlink device. +device = broadlink.hello('192.168.0.16') ``` If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: @@ -112,7 +114,7 @@ packet = device.check_data() ### Learning RF codes -Learning IR codes takes place in five steps. +Learning RF codes takes place in five steps. 1. Sweep the frequency: ```python3 From 9873af9bc471a5cf12610485ae9dcdb9bc108bb4 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Mon, 18 Oct 2021 14:19:41 -0300 Subject: [PATCH 03/21] Standardize ip_address option (#630) --- broadlink/__init__.py | 8 ++++++-- broadlink/device.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 5f715201..560fd53d 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -192,7 +192,7 @@ def gendevice( def hello( - host: str, + ip_address: str, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT, ) -> Device: @@ -202,7 +202,11 @@ def hello( """ try: return next( - xdiscover(timeout=timeout, discover_ip_address=host, discover_ip_port=port) + xdiscover( + timeout=timeout, + discover_ip_address=ip_address, + discover_ip_port=port, + ) ) except StopIteration as err: raise e.NetworkTimeoutError( diff --git a/broadlink/device.py b/broadlink/device.py index 74e916f4..3bc9d8aa 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -76,7 +76,7 @@ def scan( conn.close() -def ping(address: str, port: int = DEFAULT_PORT) -> None: +def ping(ip_address: str, port: int = DEFAULT_PORT) -> None: """Send a ping packet to an address. This packet feeds the watchdog timer of firmwares >= v53. @@ -87,7 +87,7 @@ def ping(address: str, port: int = DEFAULT_PORT) -> None: conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) packet = bytearray(0x30) packet[0x26] = 1 - conn.sendto(packet, (address, port)) + conn.sendto(packet, (ip_address, port)) class Device: From f2a582b8f994791b1736b6336296a83b0ab51b4c Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Mon, 18 Oct 2021 15:59:47 -0300 Subject: [PATCH 04/21] Add support for Broadlink MP1 with power meter (#631) --- broadlink/__init__.py | 6 ++++-- broadlink/switch.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 560fd53d..d69c91d4 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -12,7 +12,7 @@ from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro from .sensor import a1 -from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b +from .switch import bg1, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { sp1: { @@ -133,10 +133,12 @@ }, mp1: { 0x4EB5: ("MP1-1K4S", "Broadlink"), - 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), 0x4F65: ("MP1-1K3S2U", "Broadlink"), }, + mp1s: { + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + }, lb1: { 0x5043: ("SB800TD", "Broadlink (OEM)"), 0x504E: ("LB1", "Broadlink"), diff --git a/broadlink/switch.py b/broadlink/switch.py index 1079cde0..a2c15be2 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -360,3 +360,18 @@ def check_power(self) -> dict: "s3": bool(data & 4), "s4": bool(data & 8), } + + +class mp1s(mp1): + """Controls a Broadlink MP1S.""" + + TYPE = "MP1S" + + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + energy = payload[0x7:0x4:-1].hex() + return int(energy) / 100 From bb195043142582c210122b449d7686eb6311a04d Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Mon, 18 Oct 2021 18:23:05 -0300 Subject: [PATCH 05/21] Fix instructions for learning RF codes (#632) --- README.md | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 06b7f9c3..eea48d3f 100644 --- a/README.md +++ b/README.md @@ -114,23 +114,33 @@ packet = device.check_data() ### Learning RF codes -Learning RF codes takes place in five steps. +Learning RF codes takes place in six steps. 1. Sweep the frequency: ```python3 device.sweep_frequency() ``` 2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. -3. Enter learning mode: +3. Check if the frequency was successfully identified: +```python3 +ok = device.check_frequency() +if ok: + print('Frequency found!') +``` +4. Enter learning mode: ```python3 device.find_rf_packet() ``` -4. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. -5. Get the RF packet: +5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. +6. Get the RF packet: ```python3 packet = device.check_data() ``` +#### Notes + +Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`. + ### Canceling learning You can exit the learning mode in the middle of the process by calling this method: From abcc9aaeed6723de333516f2f0bff79afb6ac372 Mon Sep 17 00:00:00 2001 From: fustom Date: Sun, 22 Jan 2023 05:50:37 +0100 Subject: [PATCH 06/21] Add heating_cooling state to Hysen (#722) --- broadlink/climate.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/broadlink/climate.py b/broadlink/climate.py index eee5f119..15aea900 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -64,6 +64,7 @@ def get_full_status(self) -> dict: data["power"] = payload[4] & 1 data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 + data["heating_cooling"] = (payload[4] >> 7) & 1 data["room_temp"] = payload[5] / 2.0 data["thermostat_temp"] = payload[6] / 2.0 data["auto_mode"] = payload[7] & 0xF @@ -185,9 +186,11 @@ def set_temp(self, temp: float) -> None: # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. - def set_power(self, power: int = 1, remote_lock: int = 0) -> None: + # heating_cooling: heating(0) cooling(1) + def set_power(self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0) -> None: """Set the power state of the device.""" - self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power]) + state = (heating_cooling << 7) + power + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday From 634370d8785d2483eb075221d4d30f0bfb3a27d3 Mon Sep 17 00:00:00 2001 From: Ian Munsie Date: Wed, 10 Apr 2024 04:40:00 +1000 Subject: [PATCH 07/21] Add ability to RF scan a specific frequency (#613) * Add ability to RF scan a specific frequency This adds an optional parameter to find_rf_packet(), along with a corresponding --rflearn parameter (defaulting to 433.92) to broadlink_cli that specifies the frequency to tune to, rather than requiring the frequency be found via sweeping. This is almost mandatory for certain types of remotes that do not repeat their signals while the button is held, and saves significant time when the frequency is known in advance or when many buttons are to be captured in a row. Additionally: - A get_frequency() API is added to return the current frequency the device is tuned to. - A check_frequency_ex() API is added to perform functions of both check_frequency() and get_frequency() in a single call. - broadlink_cli --rfscanlearn will now report the current frequency at 1 second intervals during sweeping, and will report the frequency it finally locks on to. * Clean up remote.py * Clean up broadlink_cli * Update conditional * Fix message --------- Co-authored-by: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> --- broadlink/remote.py | 14 ++++++++++---- cli/broadlink_cli | 46 +++++++++++++++++++++++++-------------------- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/broadlink/remote.py b/broadlink/remote.py index 017dac47..f4db3d2f 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,5 +1,6 @@ """Support for universal remotes.""" import struct +import typing as t from . import exceptions as e from .device import Device @@ -46,14 +47,19 @@ def sweep_frequency(self) -> None: """Sweep frequency.""" self._send(0x19) - def check_frequency(self) -> bool: + def check_frequency(self) -> t.Tuple[bool, float]: """Return True if the frequency was identified successfully.""" resp = self._send(0x1A) - return resp[0] == 1 + is_found = bool(resp[0]) + frequency = struct.unpack(" None: + def find_rf_packet(self, frequency: float = None) -> None: """Enter radiofrequency learning mode.""" - self._send(0x1B) + payload = bytearray() + if frequency: + payload += struct.pack(" None: """Cancel sweep frequency.""" diff --git a/cli/broadlink_cli b/cli/broadlink_cli index f7a24ade..1083e596 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -83,7 +83,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning") +parser.add_argument("--rflearn", action="store_true", help="rf scan learning") +parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") parser.add_argument("--learnfile", help="save learned command to a specified file") parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format") @@ -127,7 +128,7 @@ if args.send: data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ if args.durations else bytearray.fromhex(''.join(args.data)) dev.send_data(data) -if args.learn or (args.learnfile and not args.rfscanlearn): +if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() print("Learning...") start = time.time() @@ -195,28 +196,33 @@ if args.switch: else: dev.set_power(True) print('* Switch to ON *') -if args.rfscanlearn: - dev.sweep_frequency() - print("Learning RF Frequency, press and hold the button to learn...") - - start = time.time() - while time.time() - start < TIMEOUT: - time.sleep(1) - if dev.check_frequency(): - break +if args.rflearn: + if args.frequency: + frequency = args.frequency + print("Press the button you want to learn, a short press...") else: - print("RF Frequency not found") - dev.cancel_sweep_frequency() - exit(1) + dev.sweep_frequency() + print("Detecting radiofrequency, press and hold the button to learn...") + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found") + dev.cancel_sweep_frequency() + exit(1) - print("Found RF Frequency - 1 of 2!") - print("You can now let go of the button") + print("Radiofrequency detected: {}MHz".format(frequency)) + print("You can now let go of the button") - input("Press enter to continue...") + input("Press enter to continue...") - print("To complete learning, single press the button you want to learn") + print("Press the button again, now a short press.") - dev.find_rf_packet() + dev.find_rf_packet(frequency) start = time.time() while time.time() - start < TIMEOUT: @@ -231,7 +237,7 @@ if args.rfscanlearn: print("No data received...") exit(1) - print("Found RF Frequency - 2 of 2!") + print("Packet found!") learned = format_durations(to_microseconds(bytearray(data))) \ if args.durations \ else ''.join(format(x, '02x') for x in bytearray(data)) From d7ed9855b98a74006a5c3309391fc95e6ffe5cc1 Mon Sep 17 00:00:00 2001 From: irsl Date: Tue, 9 Apr 2024 21:06:38 +0200 Subject: [PATCH 08/21] Thermostat: get the 1st decimal place (#772) --- broadlink/climate.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) mode change 100644 => 100755 broadlink/climate.py diff --git a/broadlink/climate.py b/broadlink/climate.py old mode 100644 new mode 100755 index 15aea900..9531268a --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -43,15 +43,28 @@ def send_request(self, request: t.Sequence[int]) -> bytes: return payload[0x02:p_len] + def _room_or_ext_temp_logic(self, payload, base_index): + base_temp = payload[base_index] / 2.0 + add_offset = (payload[4] >> 3) & 1 # should offset be added? + offset_raw_value = (payload[17] >> 4) & 3 # offset value + offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 + return base_temp + offset + + def _room_temp_logic(self, payload): + return self._room_or_ext_temp_logic(payload, 5) + + def _ext_temp_logic(self, payload): + return self._room_or_ext_temp_logic(payload, 18) + def get_temp(self) -> float: """Return the room temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[0x05] / 2.0 + return self._room_temp_logic(payload) def get_external_temp(self) -> float: """Return the external temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[18] / 2.0 + return self._ext_temp_logic(payload) def get_full_status(self) -> dict: """Return the state of the device. @@ -65,7 +78,7 @@ def get_full_status(self) -> dict: data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 data["heating_cooling"] = (payload[4] >> 7) & 1 - data["room_temp"] = payload[5] / 2.0 + data["room_temp"] = self._room_temp_logic(payload) data["thermostat_temp"] = payload[6] / 2.0 data["auto_mode"] = payload[7] & 0xF data["loop_mode"] = payload[7] >> 4 @@ -80,7 +93,7 @@ def get_full_status(self) -> dict: data["fre"] = payload[15] data["poweron"] = payload[16] data["unknown"] = payload[17] - data["external_temp"] = payload[18] / 2.0 + data["external_temp"] = self._ext_temp_logic(payload) data["hour"] = payload[19] data["min"] = payload[20] data["sec"] = payload[21] @@ -187,7 +200,9 @@ def set_temp(self, temp: float) -> None: # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. # heating_cooling: heating(0) cooling(1) - def set_power(self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0) -> None: + def set_power( + self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0 + ) -> None: """Set the power state of the device.""" state = (heating_cooling << 7) + power self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) From 06c91ae3943423ebc119a4627fd46efb11d68bfa Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Tue, 9 Apr 2024 16:14:04 -0300 Subject: [PATCH 09/21] Remove auxiliary functions from hysen class (#780) --- broadlink/climate.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/broadlink/climate.py b/broadlink/climate.py index 9531268a..c04e65c0 100755 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -43,28 +43,22 @@ def send_request(self, request: t.Sequence[int]) -> bytes: return payload[0x02:p_len] - def _room_or_ext_temp_logic(self, payload, base_index): + def _decode_temp(self, payload, base_index): base_temp = payload[base_index] / 2.0 add_offset = (payload[4] >> 3) & 1 # should offset be added? offset_raw_value = (payload[17] >> 4) & 3 # offset value offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 return base_temp + offset - def _room_temp_logic(self, payload): - return self._room_or_ext_temp_logic(payload, 5) - - def _ext_temp_logic(self, payload): - return self._room_or_ext_temp_logic(payload, 18) - def get_temp(self) -> float: """Return the room temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return self._room_temp_logic(payload) + return self._decode_temp(payload, 5) def get_external_temp(self) -> float: """Return the external temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return self._ext_temp_logic(payload) + return self._decode_temp(payload, 18) def get_full_status(self) -> dict: """Return the state of the device. @@ -78,7 +72,7 @@ def get_full_status(self) -> dict: data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 data["heating_cooling"] = (payload[4] >> 7) & 1 - data["room_temp"] = self._room_temp_logic(payload) + data["room_temp"] = self._decode_temp(payload, 5) data["thermostat_temp"] = payload[6] / 2.0 data["auto_mode"] = payload[7] & 0xF data["loop_mode"] = payload[7] >> 4 @@ -93,7 +87,7 @@ def get_full_status(self) -> dict: data["fre"] = payload[15] data["poweron"] = payload[16] data["unknown"] = payload[17] - data["external_temp"] = self._ext_temp_logic(payload) + data["external_temp"] = self._decode_temp(payload, 18) data["hour"] = payload[19] data["min"] = payload[20] data["sec"] = payload[21] From c6bf96da470d9c70646492d06f2a1c61c11056fb Mon Sep 17 00:00:00 2001 From: Hozoy Date: Wed, 10 Apr 2024 06:23:35 +0800 Subject: [PATCH 10/21] Add mp1s get_status function (#762) --- broadlink/switch.py | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/broadlink/switch.py b/broadlink/switch.py index a2c15be2..e60f124a 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -367,11 +367,40 @@ class mp1s(mp1): TYPE = "MP1S" - def get_energy(self) -> float: - """Return the power consumption in W.""" - packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) + def get_status(self) -> dict: + """ + Return the power state of the device. + voltage in V. + current in A. + power in W. + power consumption in kW·h. + """ + packet = bytearray(16) + packet[0x00] = 0x0E + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + packet[0x0A] = 0x04 + response = self.send_packet(0x6A, packet) e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - energy = payload[0x7:0x4:-1].hex() - return int(energy) / 100 + payload_str = payload.hex()[4:-6] + + def get_value(start, end, factors): + value = sum(int(payload_str[i-2:i]) * factor for i, + factor in zip(range(start, end, -2), factors)) + return value + + return { + 'voltage': get_value(34, 30, [10, 0.1]), + 'current': get_value(40, 34, [1, 0.01, 0.0001]), + 'power': get_value(46, 40, [100, 1, 0.01]), + 'power_consumption': get_value(54, 46, [10000, 100, 1, 0.01]) + } + + From cacebe7f3c5d1ea0317cf11930350b9352b96bfa Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Tue, 9 Apr 2024 19:43:29 -0300 Subject: [PATCH 11/21] Rename MP1S state parameters (#783) * Rename MP1S state parameters * Rename get_status to get_state --- broadlink/__init__.py | 8 ++++---- broadlink/switch.py | 24 ++++++++++++------------ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 596a8ec1..a2bd5eac 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -160,7 +160,7 @@ 0x60C8: ("LB1", "Broadlink"), 0x6112: ("LB1", "Broadlink"), 0x644B: ("LB1", "Broadlink"), - 0x644C: ("LB27 R1", "Broadlink"), + 0x644C: ("LB27 R1", "Broadlink"), 0x644E: ("LB26 R1", "Broadlink"), }, lb2: { @@ -170,9 +170,9 @@ S1C: { 0x2722: ("S2KIT", "Broadlink"), }, - s3: { - 0xA59C:("S3", "Broadlink"), - 0xA64D:("S3", "Broadlink"), + s3: { + 0xA59C: ("S3", "Broadlink"), + 0xA64D: ("S3", "Broadlink"), }, hysen: { 0x4EAD: ("HY02/HY03", "Hysen"), diff --git a/broadlink/switch.py b/broadlink/switch.py index e60f124a..1a94de6b 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -367,9 +367,9 @@ class mp1s(mp1): TYPE = "MP1S" - def get_status(self) -> dict: - """ - Return the power state of the device. + def get_state(self) -> dict: + """Return the power state of the device. + voltage in V. current in A. power in W. @@ -392,15 +392,15 @@ def get_status(self) -> dict: payload_str = payload.hex()[4:-6] def get_value(start, end, factors): - value = sum(int(payload_str[i-2:i]) * factor for i, - factor in zip(range(start, end, -2), factors)) + value = sum( + int(payload_str[i - 2 : i]) * factor + for i, factor in zip(range(start, end, -2), factors) + ) return value - + return { - 'voltage': get_value(34, 30, [10, 0.1]), - 'current': get_value(40, 34, [1, 0.01, 0.0001]), - 'power': get_value(46, 40, [100, 1, 0.01]), - 'power_consumption': get_value(54, 46, [10000, 100, 1, 0.01]) + "volt": get_value(34, 30, [10, 0.1]), + "current": get_value(40, 34, [1, 0.01, 0.0001]), + "power": get_value(46, 40, [100, 1, 0.01]), + "totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]), } - - From 821820c61e16e3daaa3f5ced18916d1236cf5587 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Tue, 9 Apr 2024 19:56:30 -0300 Subject: [PATCH 12/21] Add support for BG Electrical EHC31 (0x6480) (#784) --- broadlink/__init__.py | 5 +++- broadlink/switch.py | 56 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index a2bd5eac..3f63930d 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -13,7 +13,7 @@ from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro from .sensor import a1 -from .switch import bg1, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b +from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { sp1: { @@ -183,6 +183,9 @@ bg1: { 0x51E3: ("BG800/BG900", "BG Electrical"), }, + ehc31: { + 0x6480: ("EHC31", "BG Electrical"), + }, } diff --git a/broadlink/switch.py b/broadlink/switch.py index 1a94de6b..bf0d7438 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -306,6 +306,62 @@ def _decode(self, response: bytes) -> dict: return state +class ehc31(bg1): + """Controls a BG Electrical smart extension lead.""" + + TYPE = "EHC31" + + def set_state( + self, + pwr: bool = None, + pwr1: bool = None, + pwr2: bool = None, + pwr3: bool = None, + maxworktime1: int = None, + maxworktime2: int = None, + maxworktime3: int = None, + idcbrightness: int = None, + childlock: bool = None, + childlock1: bool = None, + childlock2: bool = None, + childlock3: bool = None, + childlock4: bool = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if maxworktime3 is not None: + state["maxworktime3"] = maxworktime3 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + if childlock is not None: + state["childlock"] = int(bool(childlock)) + if childlock1 is not None: + state["childlock1"] = int(bool(childlock1)) + if childlock2 is not None: + state["childlock2"] = int(bool(childlock2)) + if childlock3 is not None: + state["childlock3"] = int(bool(childlock3)) + if childlock4 is not None: + state["childlock4"] = int(bool(childlock4)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + class mp1(Device): """Controls a Broadlink MP1.""" From 4766d68289c1bfeab26378d0620646bfca662223 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Tue, 9 Apr 2024 20:32:41 -0300 Subject: [PATCH 13/21] Add support for Dooya DT360E (v2) (#785) --- broadlink/__init__.py | 5 +++- broadlink/cover.py | 62 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 3f63930d..3bfff8e9 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -7,7 +7,7 @@ from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C from .climate import hysen -from .cover import dooya +from .cover import dooya, dooya2 from .device import Device, ping, scan from .hub import s3 from .light import lb1, lb2 @@ -180,6 +180,9 @@ dooya: { 0x4E4D: ("DT360E-45/20", "Dooya"), }, + dooya2: { + 0x4F6E: ("DT360E-45/20", "Dooya"), + }, bg1: { 0x51E3: ("BG800/BG900", "BG Electrical"), }, diff --git a/broadlink/cover.py b/broadlink/cover.py index c0f08abb..1889c5c3 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -8,15 +8,15 @@ class dooya(Device): """Controls a Dooya curtain motor.""" - TYPE = "Dooya DT360E" + TYPE = "DT360E" - def _send(self, magic1: int, magic2: int) -> int: + def _send(self, command: int, attribute: int = 0) -> int: """Send a packet to the device.""" packet = bytearray(16) packet[0] = 0x09 packet[2] = 0xBB - packet[3] = magic1 - packet[4] = magic2 + packet[3] = command + packet[4] = attribute packet[9] = 0xFA packet[10] = 0x44 response = self.send_packet(0x6A, packet) @@ -26,15 +26,15 @@ def _send(self, magic1: int, magic2: int) -> int: def open(self) -> int: """Open the curtain.""" - return self._send(0x01, 0x00) + return self._send(0x01) def close(self) -> int: """Close the curtain.""" - return self._send(0x02, 0x00) + return self._send(0x02) def stop(self) -> int: """Stop the curtain.""" - return self._send(0x03, 0x00) + return self._send(0x03) def get_percentage(self) -> int: """Return the position of the curtain.""" @@ -55,3 +55,51 @@ def set_percentage_and_wait(self, new_percentage: int) -> None: time.sleep(0.2) current = self.get_percentage() self.stop() + + +class dooya2(Device): + """Controls a Dooya curtain motor (version 2).""" + + TYPE = "DT360E-2" + + def _send(self, command: int, attribute: int = 0) -> int: + """Send a packet to the device.""" + checksum = 0xC0C4 + command + attribute & 0xFFFF + packet = bytearray(32) + packet[0] = 0x16 + packet[2] = 0xA5 + packet[3] = 0xA5 + packet[4] = 0x5A + packet[5] = 0x5A + packet[6] = checksum & 0xFF + packet[7] = checksum >> 8 + packet[8] = 0x02 + packet[9] = 0x0B + packet[10] = 0x0A + packet[15] = command + packet[16] = attribute + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x11] + + def open(self) -> None: + """Open the curtain.""" + self._send(0x01) + + def close(self) -> None: + """Close the curtain.""" + self._send(0x02) + + def stop(self) -> None: + """Stop the curtain.""" + self._send(0x03) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + return self._send(0x06) + + def set_percentage(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + self._send(0x09, new_percentage) From 84af992dccc73b03bf179ff5e0790f272ae3323d Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 10 Apr 2024 16:35:25 -0300 Subject: [PATCH 14/21] Add support for Wistar smart curtain (0x4F6C) (#786) * Add support for Wistar smart curtain (0x4F6C) * Rename wsrc to wser --- broadlink/__init__.py | 5 +- broadlink/cover.py | 138 ++++++++++++++++++++++++++++++++---------- 2 files changed, 110 insertions(+), 33 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 3bfff8e9..070f06db 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -7,7 +7,7 @@ from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C from .climate import hysen -from .cover import dooya, dooya2 +from .cover import dooya, dooya2, wser from .device import Device, ping, scan from .hub import s3 from .light import lb1, lb2 @@ -183,6 +183,9 @@ dooya2: { 0x4F6E: ("DT360E-45/20", "Dooya"), }, + wser: { + 0x4F6C: ("WSER", "Wistar"), + }, bg1: { 0x51E3: ("BG800/BG900", "BG Electrical"), }, diff --git a/broadlink/cover.py b/broadlink/cover.py index 1889c5c3..23727a86 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -13,15 +13,16 @@ class dooya(Device): def _send(self, command: int, attribute: int = 0) -> int: """Send a packet to the device.""" packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xBB - packet[3] = command - packet[4] = attribute - packet[9] = 0xFA - packet[10] = 0x44 - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) + packet[0x00] = 0x09 + packet[0x02] = 0xBB + packet[0x03] = command + packet[0x04] = attribute + packet[0x09] = 0xFA + packet[0x0A] = 0x44 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) return payload[4] def open(self) -> int: @@ -62,44 +63,117 @@ class dooya2(Device): TYPE = "DT360E-2" - def _send(self, command: int, attribute: int = 0) -> int: - """Send a packet to the device.""" - checksum = 0xC0C4 + command + attribute & 0xFFFF - packet = bytearray(32) - packet[0] = 0x16 - packet[2] = 0xA5 - packet[3] = 0xA5 - packet[4] = 0x5A - packet[5] = 0x5A + def _send(self, operation: int, data: bytes): + """Send a command to the device.""" + packet = bytearray(14) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + + packet += bytes(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF packet[6] = checksum & 0xFF packet[7] = checksum >> 8 - packet[8] = 0x02 - packet[9] = 0x0B - packet[10] = 0x0A - packet[15] = command - packet[16] = attribute - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[0x11] + packet_len = len(packet) - 2 + packet[0] = packet_len & 0xFF + packet[1] = packet_len >> 8 + + resp = self.send_packet(0x6a, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload def open(self) -> None: """Open the curtain.""" - self._send(0x01) + self._send(2, [0x00, 0x01, 0x00]) def close(self) -> None: """Close the curtain.""" - self._send(0x02) + self._send(2, [0x00, 0x02, 0x00]) def stop(self) -> None: """Stop the curtain.""" - self._send(0x03) + self._send(2, [0x00, 0x03, 0x00]) def get_percentage(self) -> int: """Return the position of the curtain.""" - return self._send(0x06) + resp = self._send(1, [0x00, 0x06, 0x00]) + return resp[0x11] def set_percentage(self, new_percentage: int) -> None: """Set the position of the curtain.""" - self._send(0x09, new_percentage) + self._send(2, [0x00, 0x09, new_percentage]) + + +class wser(Device): + """Controls a Wistar curtain motor""" + + TYPE = "WSER" + + def _send(self, operation: int, data: bytes): + """Send a command to the device.""" + packet = bytearray(14) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + + packet += bytes(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[6] = checksum & 0xFF + packet[7] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0] = packet_len & 0xFF + packet[1] = packet_len >> 8 + + resp = self.send_packet(0x6a, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def get_position(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, []) + position = resp[0x0E] + return position + + def open(self) -> int: + """Open the curtain.""" + resp = self._send(2, [0x4a, 0x31, 0xa0]) + position = resp[0x0E] + return position + + def close(self) -> int: + """Close the curtain.""" + resp = self._send(2, [0x61, 0x32, 0xa0]) + position = resp[0x0E] + return position + + def stop(self) -> int: + """Stop the curtain.""" + resp = self._send(2, [0x4c, 0x73, 0xa0]) + position = resp[0x0E] + return position + + def set_position(self, position: int) -> int: + """Set the position of the curtain.""" + resp = self._send(2, [position, 0x70, 0xa0]) + position = resp[0x0E] + return position From 247be74c33b533b42a14be98179d9f8b5293d385 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 10 Apr 2024 22:51:41 -0300 Subject: [PATCH 15/21] Expose IR/RF conversion functions (#788) * Move IR duration<->Broadlink conversion down from CLI * Fix --learn base64 to not crash with --durations Also remove its b'...' wrapping. * Fix IR/RF conversions --------- Co-authored-by: William Grant --- broadlink/remote.py | 41 +++++++++++++++++ cli/broadlink_cli | 106 ++++++++++++++++---------------------------- 2 files changed, 79 insertions(+), 68 deletions(-) diff --git a/broadlink/remote.py b/broadlink/remote.py index f4db3d2f..89cb71f5 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -6,6 +6,47 @@ from .device import Device +def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) + + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError: + raise ValueError("Malformed data.") + index += 2 + + result.append(int(chunk * tick)) + + return result + + class rmmini(Device): """Controls a Broadlink RM mini 3.""" diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 1083e596..35317ee4 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,68 +1,32 @@ #!/usr/bin/env python3 import argparse import base64 -import codecs import time +import typing as t import broadlink from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data -TICK = 32.84 TIMEOUT = 30 -IR_TOKEN = 0x26 def auto_int(x): return int(x, 0) -def to_microseconds(bytes): - result = [] - # print bytes[0] # 0x26 = 38for IR - index = 4 - while index < len(bytes): - chunk = bytes[index] - index += 1 - if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] - index += 2 - result.append(int(round(chunk * TICK))) - if chunk == 0x0d05: - break - return result - - -def durations_to_broadlink(durations): - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur / TICK)) - if num > 255: - result.append(0) - result.append(num / 256) - result.append(num % 256) - return result +def format_pulses(pulses: t.List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result - - -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: t.List[str]) -> t.List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -112,8 +76,8 @@ if args.joinwifi: if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = to_microseconds(data) - print(format_durations(durations)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: print(dev.check_temperature()) if args.humidity: @@ -125,8 +89,11 @@ if args.sensors: for key in data: print("{} {}".format(key, data[key])) if args.send: - data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() @@ -144,17 +111,19 @@ if args.learn or (args.learnfile and not args.rflearn): print("No data received...") exit(1) - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): print('* ON *') @@ -238,14 +207,15 @@ if args.rflearn: exit(1) print("Packet found!") - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learnfile is None: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) - if args.learnfile is not None: + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) From eb0f98a410990022156dad3f58d257141224e88a Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 10 Apr 2024 23:15:46 -0300 Subject: [PATCH 16/21] Fix README.md (#789) --- cli/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/README.md b/cli/README.md index 7b40b4cf..b7e48dc9 100644 --- a/cli/README.md +++ b/cli/README.md @@ -97,7 +97,7 @@ broadlink_cli --device @BEDROOM.device --temperature #### Check humidity ``` -broadlink_cli --device @BEDROOM.device --temperature +broadlink_cli --device @BEDROOM.device --humidity ``` ### Smart plugs From 24b9d308b6a6a27be452564ad47075edee50651c Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 10 Apr 2024 23:55:41 -0300 Subject: [PATCH 17/21] Fix s3.get_subdevices() (#790) * Fix s3.get_subdevices() * Fix docstring --- broadlink/hub.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/broadlink/hub.py b/broadlink/hub.py index 07b02e82..cb24dc8d 100644 --- a/broadlink/hub.py +++ b/broadlink/hub.py @@ -12,22 +12,34 @@ class s3(Device): TYPE = "S3" MAX_SUBDEVICES = 8 - def get_subdevices(self) -> list: - """Return the lit of sub devices.""" + def get_subdevices(self, step: int = 5) -> list: + """Return a list of sub devices.""" + total = self.MAX_SUBDEVICES sub_devices = [] - step = 5 + seen = set() + index = 0 - for index in range(0, self.MAX_SUBDEVICES, step): + while index < total: state = {"count": step, "index": index} packet = self._encode(14, state) resp = self.send_packet(0x6A, packet) e.check_error(resp[0x22:0x24]) resp = self._decode(resp) - sub_devices.extend(resp["list"]) - if len(sub_devices) == resp["total"]: + for device in resp["list"]: + did = device["did"] + if did in seen: + continue + + seen.add(did) + sub_devices.append(device) + + total = resp["total"] + if len(seen) >= total: break + index += step + return sub_devices def get_state(self, did: str = None) -> dict: From fa44b54d88a73c2a5fdc320d6fd0528587a2e84b Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Fri, 12 Apr 2024 02:10:06 -0300 Subject: [PATCH 18/21] Add support for Broadlink A2 (#791) * Add support for Broadlink A2 * Add supported type * Fix bugs * Improve device name --- broadlink/__init__.py | 7 +++-- broadlink/cover.py | 58 ++++++++++++++++++------------------ broadlink/sensor.py | 69 ++++++++++++++++++++++++++++++++++--------- 3 files changed, 90 insertions(+), 44 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 070f06db..1d4ffb2a 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -12,7 +12,7 @@ from .hub import s3 from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro -from .sensor import a1 +from .sensor import a1, a2 from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { @@ -142,7 +142,10 @@ 0x653C: ("RM4 pro", "Broadlink"), }, a1: { - 0x2714: ("e-Sensor", "Broadlink"), + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), }, mp1: { 0x4EB5: ("MP1-1K4S", "Broadlink"), diff --git a/broadlink/cover.py b/broadlink/cover.py index 23727a86..1d8b41ef 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -63,31 +63,32 @@ class dooya2(Device): TYPE = "DT360E-2" - def _send(self, operation: int, data: bytes): + def _send(self, operation: int, data: bytes = b""): """Send a command to the device.""" - packet = bytearray(14) + packet = bytearray(12) packet[0x02] = 0xA5 packet[0x03] = 0xA5 packet[0x04] = 0x5A packet[0x05] = 0x5A packet[0x08] = operation packet[0x09] = 0x0B - - data_len = len(data) - packet[0x0A] = data_len & 0xFF - packet[0x0B] = data_len >> 8 - packet += bytes(data) + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[6] = checksum & 0xFF - packet[7] = checksum >> 8 + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 packet_len = len(packet) - 2 - packet[0] = packet_len & 0xFF - packet[1] = packet_len >> 8 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 - resp = self.send_packet(0x6a, packet) + resp = self.send_packet(0x6A, packet) e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) return payload @@ -119,31 +120,32 @@ class wser(Device): TYPE = "WSER" - def _send(self, operation: int, data: bytes): + def _send(self, operation: int, data: bytes = b""): """Send a command to the device.""" - packet = bytearray(14) + packet = bytearray(12) packet[0x02] = 0xA5 packet[0x03] = 0xA5 packet[0x04] = 0x5A packet[0x05] = 0x5A packet[0x08] = operation packet[0x09] = 0x0B - - data_len = len(data) - packet[0x0A] = data_len & 0xFF - packet[0x0B] = data_len >> 8 - packet += bytes(data) + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[6] = checksum & 0xFF - packet[7] = checksum >> 8 + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 packet_len = len(packet) - 2 - packet[0] = packet_len & 0xFF - packet[1] = packet_len >> 8 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 - resp = self.send_packet(0x6a, packet) + resp = self.send_packet(0x6A, packet) e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) return payload @@ -156,24 +158,24 @@ def get_position(self) -> int: def open(self) -> int: """Open the curtain.""" - resp = self._send(2, [0x4a, 0x31, 0xa0]) + resp = self._send(2, [0x4A, 0x31, 0xA0]) position = resp[0x0E] return position def close(self) -> int: """Close the curtain.""" - resp = self._send(2, [0x61, 0x32, 0xa0]) + resp = self._send(2, [0x61, 0x32, 0xA0]) position = resp[0x0E] return position def stop(self) -> int: """Stop the curtain.""" - resp = self._send(2, [0x4c, 0x73, 0xa0]) + resp = self._send(2, [0x4C, 0x73, 0xA0]) position = resp[0x0E] return position def set_position(self, position: int) -> int: """Set the position of the curtain.""" - resp = self._send(2, [position, 0x70, 0xa0]) + resp = self._send(2, [position, 0x70, 0xA0]) position = resp[0x0E] return position diff --git a/broadlink/sensor.py b/broadlink/sensor.py index 33e7587d..21bae0b9 100644 --- a/broadlink/sensor.py +++ b/broadlink/sensor.py @@ -1,6 +1,4 @@ """Support for sensors.""" -import struct - from . import exceptions as e from .device import Device @@ -29,19 +27,62 @@ def check_sensors(self) -> dict: def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - data = payload[0x4:] + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + data = self.decrypt(resp[0x38:]) + + return { + "temperature": data[0x04] + data[0x05] / 10.0, + "humidity": data[0x06] + data[0x07] / 10.0, + "light": data[0x08], + "air_quality": data[0x0A], + "noise": data[0x0C], + } + + +class a2(Device): + """Controls a Broadlink A2.""" + + TYPE = "A2" - temperature = struct.unpack("> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) return { - "temperature": temperature, - "humidity": humidity, - "light": data[0x4], - "air_quality": data[0x6], - "noise": data[0x8], + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], } From 1e115586136acb937853aa0384c7288abd82455b Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 17 Apr 2024 02:06:36 -0300 Subject: [PATCH 19/21] Add support for Tornado 16X SQ air conditioner (0x4E2A) (#520) * Add support for Tornado 16X SQ air conditioner * Make Tornado a generic HVAC class * Better names * Clean up IntEnums * Clean up encoders * Fix indexes * Improve set_state() interface * Enumerate presets * Rename state to power in get_ac_info() * Paint it black * Use CRC16 helper class * Remove log messages * Fix bugs * Return state in set_state() --- broadlink/__init__.py | 13 ++- broadlink/climate.py | 262 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 258 insertions(+), 17 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 1d4ffb2a..8af3e4c6 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -6,7 +6,7 @@ from . import exceptions as e from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C -from .climate import hysen +from .climate import hvac, hysen from .cover import dooya, dooya2, wser from .device import Device, ping, scan from .hub import s3 @@ -177,6 +177,9 @@ 0xA59C: ("S3", "Broadlink"), 0xA64D: ("S3", "Broadlink"), }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, hysen: { 0x4EAD: ("HY02/HY03", "Hysen"), }, @@ -258,7 +261,9 @@ def discover( discover_ip_port: int = DEFAULT_PORT, ) -> t.List[Device]: """Discover devices connected to the local network.""" - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) return [gendevice(*resp) for resp in responses] @@ -272,7 +277,9 @@ def xdiscover( This function returns a generator that yields devices instantly. """ - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) for resp in responses: yield gendevice(*resp) diff --git a/broadlink/climate.py b/broadlink/climate.py index c04e65c0..1a0c6006 100755 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,5 +1,7 @@ -"""Support for HVAC units.""" -import typing as t +"""Support for climate control.""" +import enum +import struct +from typing import List, Sequence from . import exceptions as e from .device import Device @@ -19,7 +21,7 @@ class hysen(Device): TYPE = "HYS" - def send_request(self, request: t.Sequence[int]) -> bytes: + def send_request(self, request: Sequence[int]) -> bytes: """Send a request to the device.""" packet = bytearray() packet.extend((len(request) + 2).to_bytes(2, "little")) @@ -31,15 +33,15 @@ def send_request(self, request: t.Sequence[int]) -> bytes: payload = self.decrypt(response[0x38:]) p_len = int.from_bytes(payload[:0x02], "little") - if p_len + 2 > len(payload): - raise ValueError( - "hysen_response_error", "first byte of response is not length" - ) - - nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") real_crc = CRC16.calculate(payload[0x02:p_len]) + if nom_crc != real_crc: - raise ValueError("hysen_response_error", "CRC check on response failed") + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) return payload[0x02:p_len] @@ -74,7 +76,7 @@ def get_full_status(self) -> dict: data["heating_cooling"] = (payload[4] >> 7) & 1 data["room_temp"] = self._decode_temp(payload, 5) data["thermostat_temp"] = payload[6] / 2.0 - data["auto_mode"] = payload[7] & 0xF + data["auto_mode"] = payload[7] & 0x0F data["loop_mode"] = payload[7] >> 4 data["sensor"] = payload[8] data["osv"] = payload[9] @@ -125,7 +127,9 @@ def get_full_status(self) -> dict: # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # The sensor command is currently experimental - def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) @@ -206,7 +210,19 @@ def set_power( def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request( - [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] ) # Set timer schedule @@ -215,7 +231,7 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] @@ -238,3 +254,221 @@ def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: request.append(int(weekend[i]["temp"] * 2)) self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 + + return ac_info From c4979562c8d7af2559263cd03c8132afa47b417d Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 17 Apr 2024 02:49:13 -0300 Subject: [PATCH 20/21] Fix type hints (#794) --- broadlink/__init__.py | 2 +- broadlink/cover.py | 5 +-- broadlink/device.py | 2 +- broadlink/hub.py | 16 +++++----- broadlink/light.py | 62 +++++++++++++++++++------------------ broadlink/remote.py | 12 ++++---- broadlink/sensor.py | 4 ++- broadlink/switch.py | 71 ++++++++++++++++++++++++------------------- 8 files changed, 95 insertions(+), 79 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 8af3e4c6..080e1cd1 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -269,7 +269,7 @@ def discover( def xdiscover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: str | None = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, ) -> t.Generator[Device, None, None]: diff --git a/broadlink/cover.py b/broadlink/cover.py index 1d8b41ef..75317943 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -1,5 +1,6 @@ """Support for covers.""" import time +from typing import Sequence from . import exceptions as e from .device import Device @@ -63,7 +64,7 @@ class dooya2(Device): TYPE = "DT360E-2" - def _send(self, operation: int, data: bytes = b""): + def _send(self, operation: int, data: Sequence = b""): """Send a command to the device.""" packet = bytearray(12) packet[0x02] = 0xA5 @@ -120,7 +121,7 @@ class wser(Device): TYPE = "WSER" - def _send(self, operation: int, data: bytes = b""): + def _send(self, operation: int, data: Sequence = b""): """Send a command to the device.""" packet = bytearray(12) packet[0x02] = 0xA5 diff --git a/broadlink/device.py b/broadlink/device.py index 3bc9d8aa..287b3542 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -22,7 +22,7 @@ def scan( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: str | None = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, ) -> t.Generator[HelloResponse, None, None]: diff --git a/broadlink/hub.py b/broadlink/hub.py index cb24dc8d..38894090 100644 --- a/broadlink/hub.py +++ b/broadlink/hub.py @@ -42,7 +42,7 @@ def get_subdevices(self, step: int = 5) -> list: return sub_devices - def get_state(self, did: str = None) -> dict: + def get_state(self, did: str | None = None) -> dict: """Return the power state of the device.""" state = {} if did is not None: @@ -55,10 +55,10 @@ def get_state(self, did: str = None) -> dict: def set_state( self, - did: str = None, - pwr1: bool = None, - pwr2: bool = None, - pwr3: bool = None, + did: str | None = None, + pwr1: bool | None = None, + pwr2: bool | None = None, + pwr3: bool | None = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -81,7 +81,9 @@ def _encode(self, flag: int, state: dict) -> bytes: # flag: 1 for reading, 2 for writing. packet = bytearray(12) data = json.dumps(state, separators=(",", ":")).encode() - struct.pack_into(" dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - red: int = None, - blue: int = None, - green: int = None, - brightness: int = None, - colortemp: int = None, - hue: int = None, - saturation: int = None, - transitionduration: int = None, - maxworktime: int = None, - bulb_colormode: int = None, - bulb_scenes: str = None, - bulb_scene: str = None, - bulb_sceneidx: int = None, + pwr: bool | None = None, + red: int | None = None, + blue: int | None = None, + green: int | None = None, + brightness: int | None = None, + colortemp: int | None = None, + hue: int | None = None, + saturation: int | None = None, + transitionduration: int | None = None, + maxworktime: int | None = None, + bulb_colormode: int | None = None, + bulb_scenes: str | None = None, + bulb_scene: str | None = None, + bulb_sceneidx: int | None = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -101,7 +101,7 @@ def _decode(self, response: bytes) -> dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - red: int = None, - blue: int = None, - green: int = None, - brightness: int = None, - colortemp: int = None, - hue: int = None, - saturation: int = None, - transitionduration: int = None, - maxworktime: int = None, - bulb_colormode: int = None, - bulb_scenes: str = None, - bulb_scene: str = None, + pwr: bool | None = None, + red: int | None = None, + blue: int | None = None, + green: int | None = None, + brightness: int | None = None, + colortemp: int | None = None, + hue: int | None = None, + saturation: int | None = None, + transitionduration: int | None = None, + maxworktime: int | None = None, + bulb_colormode: int | None = None, + bulb_scenes: str | None = None, + bulb_scene: str | None = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -183,7 +183,9 @@ def _encode(self, flag: int, state: dict) -> bytes: # flag: 1 for reading, 2 for writing. packet = bytearray(12) data = json.dumps(state, separators=(",", ":")).encode() - struct.pack_into(" dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" None: +def pulses_to_data(pulses: t.List[int], tick: float = 32.84) -> bytes: """Convert a microsecond duration sequence into a Broadlink IR packet.""" result = bytearray(4) result[0x00] = 0x26 @@ -25,7 +25,7 @@ def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None: return result -def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]: +def data_to_pulses(data: bytes, tick: float = 32.84) -> t.List[int]: """Parse a Broadlink packet into a microsecond duration sequence.""" result = [] index = 4 @@ -38,8 +38,8 @@ def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]: if chunk == 0: try: chunk = 256 * data[index] + data[index + 1] - except IndexError: - raise ValueError("Malformed data.") + except IndexError as err: + raise ValueError("Malformed data.") from err index += 2 result.append(int(chunk * tick)) @@ -95,7 +95,7 @@ def check_frequency(self) -> t.Tuple[bool, float]: frequency = struct.unpack(" None: + def find_rf_packet(self, frequency: float | None = None) -> None: """Enter radiofrequency learning mode.""" payload = bytearray() if frequency: @@ -129,7 +129,7 @@ def _send(self, command: int, data: bytes = b"") -> bytes: e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) p_len = struct.unpack(" None: def set_state( self, - pwr: bool = None, - ntlight: bool = None, - indicator: bool = None, - ntlbrightness: int = None, - maxworktime: int = None, - childlock: bool = None, + pwr: bool | None = None, + ntlight: bool | None = None, + indicator: bool | None = None, + ntlbrightness: int | None = None, + maxworktime: int | None = None, + childlock: bool | None = None, ) -> dict: """Set state of device.""" state = {} @@ -186,7 +186,7 @@ def _decode(self, response: bytes) -> dict: e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - pwr1: bool = None, - pwr2: bool = None, - maxworktime: int = None, - maxworktime1: int = None, - maxworktime2: int = None, - idcbrightness: int = None, + pwr: bool | None = None, + pwr1: bool | None = None, + pwr2: bool | None = None, + maxworktime: int | None = None, + maxworktime1: int | None = None, + maxworktime2: int | None = None, + idcbrightness: int | None = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -291,7 +291,16 @@ def _encode(self, flag: int, state: dict) -> bytes: data = json.dumps(state).encode() length = 12 + len(data) struct.pack_into( - " dict: """Decode a message.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: """Set the power state of the device.""" state = {} @@ -449,7 +458,7 @@ def get_state(self) -> dict: def get_value(start, end, factors): value = sum( - int(payload_str[i - 2 : i]) * factor + int(payload_str[i-2:i]) * factor for i, factor in zip(range(start, end, -2), factors) ) return value From 0a9acab2b80306166cddbb8d94c09ae788735b66 Mon Sep 17 00:00:00 2001 From: Felipe Martins Diel <41558831+felipediel@users.noreply.github.com> Date: Wed, 17 Apr 2024 03:20:13 -0300 Subject: [PATCH 21/21] Make type hints compatible with Python 3.6 (#797) --- broadlink/__init__.py | 14 +++++------ broadlink/device.py | 12 +++++----- broadlink/helpers.py | 8 +++---- broadlink/hub.py | 11 +++++---- broadlink/light.py | 55 ++++++++++++++++++++++--------------------- broadlink/remote.py | 10 ++++---- broadlink/switch.py | 53 +++++++++++++++++++++-------------------- cli/broadlink_cli | 6 ++--- 8 files changed, 86 insertions(+), 83 deletions(-) diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 98d04d92..d3135501 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """The python-broadlink library.""" import socket -import typing as t +from typing import Generator, List, Optional, Tuple, Union from . import exceptions as e from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT @@ -212,8 +212,8 @@ def gendevice( dev_type: int, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], name: str = "", is_locked: bool = False, ) -> Device: @@ -265,10 +265,10 @@ def hello( def discover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.List[Device]: +) -> List[Device]: """Discover devices connected to the local network.""" responses = scan( timeout, local_ip_address, discover_ip_address, discover_ip_port @@ -278,10 +278,10 @@ def discover( def xdiscover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str | None = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[Device, None, None]: +) -> Generator[Device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. diff --git a/broadlink/device.py b/broadlink/device.py index 287b3542..5a10bc01 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -3,7 +3,7 @@ import threading import random import time -import typing as t +from typing import Generator, Optional, Tuple, Union from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -17,15 +17,15 @@ ) from .protocol import Datetime -HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] +HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] def scan( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str | None = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[HelloResponse, None, None]: +) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -100,8 +100,8 @@ class Device: def __init__( self, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], devtype: int, timeout: int = DEFAULT_TIMEOUT, name: str = "", diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 6ee54991..e7b3d4c9 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -1,5 +1,5 @@ """Helper functions and classes.""" -import typing as t +from typing import Dict, List, Sequence class CRC16: @@ -8,10 +8,10 @@ class CRC16: CRC tables are cached for performance. """ - _cache: t.Dict[int, t.List[int]] = {} + _cache: Dict[int, List[int]] = {} @classmethod - def get_table(cls, polynomial: int) -> t.List[int]: + def get_table(cls, polynomial: int) -> List[int]: """Return the CRC-16 table for a polynomial.""" try: crc_table = cls._cache[polynomial] @@ -31,7 +31,7 @@ def get_table(cls, polynomial: int) -> t.List[int]: @classmethod def calculate( cls, - sequence: t.Sequence[int], + sequence: Sequence[int], polynomial: int = 0xA001, # CRC-16-ANSI. init_value: int = 0xFFFF, ) -> int: diff --git a/broadlink/hub.py b/broadlink/hub.py index 38894090..0fd4ae53 100644 --- a/broadlink/hub.py +++ b/broadlink/hub.py @@ -1,6 +1,7 @@ """Support for hubs.""" import struct import json +from typing import Optional from . import exceptions as e from .device import Device @@ -42,7 +43,7 @@ def get_subdevices(self, step: int = 5) -> list: return sub_devices - def get_state(self, did: str | None = None) -> dict: + def get_state(self, did: Optional[str] = None) -> dict: """Return the power state of the device.""" state = {} if did is not None: @@ -55,10 +56,10 @@ def get_state(self, did: str | None = None) -> dict: def set_state( self, - did: str | None = None, - pwr1: bool | None = None, - pwr2: bool | None = None, - pwr3: bool | None = None, + did: Optional[str] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, ) -> dict: """Set the power state of the device.""" state = {} diff --git a/broadlink/light.py b/broadlink/light.py index 0dd0d206..1ae87e8f 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -2,6 +2,7 @@ import enum import json import struct +from typing import Optional from . import exceptions as e from .device import Device @@ -32,20 +33,20 @@ def get_state(self) -> dict: def set_state( self, - pwr: bool | None = None, - red: int | None = None, - blue: int | None = None, - green: int | None = None, - brightness: int | None = None, - colortemp: int | None = None, - hue: int | None = None, - saturation: int | None = None, - transitionduration: int | None = None, - maxworktime: int | None = None, - bulb_colormode: int | None = None, - bulb_scenes: str | None = None, - bulb_scene: str | None = None, - bulb_sceneidx: int | None = None, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + bulb_sceneidx: Optional[int] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -130,19 +131,19 @@ def get_state(self) -> dict: def set_state( self, - pwr: bool | None = None, - red: int | None = None, - blue: int | None = None, - green: int | None = None, - brightness: int | None = None, - colortemp: int | None = None, - hue: int | None = None, - saturation: int | None = None, - transitionduration: int | None = None, - maxworktime: int | None = None, - bulb_colormode: int | None = None, - bulb_scenes: str | None = None, - bulb_scene: str | None = None, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, ) -> dict: """Set the power state of the device.""" state = {} diff --git a/broadlink/remote.py b/broadlink/remote.py index 64b7c35d..60c54ce2 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,12 +1,12 @@ """Support for universal remotes.""" import struct -import typing as t +from typing import List, Optional, Tuple from . import exceptions as e from .device import Device -def pulses_to_data(pulses: t.List[int], tick: float = 32.84) -> bytes: +def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes: """Convert a microsecond duration sequence into a Broadlink IR packet.""" result = bytearray(4) result[0x00] = 0x26 @@ -25,7 +25,7 @@ def pulses_to_data(pulses: t.List[int], tick: float = 32.84) -> bytes: return result -def data_to_pulses(data: bytes, tick: float = 32.84) -> t.List[int]: +def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]: """Parse a Broadlink packet into a microsecond duration sequence.""" result = [] index = 4 @@ -88,14 +88,14 @@ def sweep_frequency(self) -> None: """Sweep frequency.""" self._send(0x19) - def check_frequency(self) -> t.Tuple[bool, float]: + def check_frequency(self) -> Tuple[bool, float]: """Return True if the frequency was identified successfully.""" resp = self._send(0x1A) is_found = bool(resp[0]) frequency = struct.unpack(" None: + def find_rf_packet(self, frequency: Optional[float] = None) -> None: """Enter radiofrequency learning mode.""" payload = bytearray() if frequency: diff --git a/broadlink/switch.py b/broadlink/switch.py index a49cb1c0..8393f6b1 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -1,6 +1,7 @@ """Support for switches.""" import json import struct +from typing import Optional from . import exceptions as e from .device import Device @@ -127,12 +128,12 @@ def set_nightlight(self, ntlight: bool) -> None: def set_state( self, - pwr: bool | None = None, - ntlight: bool | None = None, - indicator: bool | None = None, - ntlbrightness: int | None = None, - maxworktime: int | None = None, - childlock: bool | None = None, + pwr: Optional[bool] = None, + ntlight: Optional[bool] = None, + indicator: Optional[bool] = None, + ntlbrightness: Optional[int] = None, + maxworktime: Optional[int] = None, + childlock: Optional[bool] = None, ) -> dict: """Set state of device.""" state = {} @@ -255,13 +256,13 @@ def get_state(self) -> dict: def set_state( self, - pwr: bool | None = None, - pwr1: bool | None = None, - pwr2: bool | None = None, - maxworktime: int | None = None, - maxworktime1: int | None = None, - maxworktime2: int | None = None, - idcbrightness: int | None = None, + pwr: Optional[bool] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + maxworktime: Optional[int] = None, + maxworktime1: Optional[int] = None, + maxworktime2: Optional[int] = None, + idcbrightness: Optional[int] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -322,19 +323,19 @@ class ehc31(bg1): def set_state( self, - pwr: bool | None = None, - pwr1: bool | None = None, - pwr2: bool | None = None, - pwr3: bool | None = None, - maxworktime1: int | None = None, - maxworktime2: int | None = None, - maxworktime3: int | None = None, - idcbrightness: int | None = None, - childlock: bool | None = None, - childlock1: bool | None = None, - childlock2: bool | None = None, - childlock3: bool | None = None, - childlock4: bool | None = None, + pwr: Optional[bool] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, + maxworktime1: Optional[int] = None, + maxworktime2: Optional[int] = None, + maxworktime3: Optional[int] = None, + idcbrightness: Optional[int] = None, + childlock: Optional[bool] = None, + childlock1: Optional[bool] = None, + childlock2: Optional[bool] = None, + childlock3: Optional[bool] = None, + childlock4: Optional[bool] = None, ) -> dict: """Set the power state of the device.""" state = {} diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 35317ee4..7913e332 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -2,7 +2,7 @@ import argparse import base64 import time -import typing as t +from typing import List import broadlink from broadlink.const import DEFAULT_PORT @@ -16,7 +16,7 @@ def auto_int(x): return int(x, 0) -def format_pulses(pulses: t.List[int]) -> str: +def format_pulses(pulses: List[int]) -> str: """Format pulses.""" return " ".join( f"+{pulse}" if i % 2 == 0 else f"-{pulse}" @@ -24,7 +24,7 @@ def format_pulses(pulses: t.List[int]) -> str: ) -def parse_pulses(data: t.List[str]) -> t.List[int]: +def parse_pulses(data: List[str]) -> List[int]: """Parse pulses.""" return [abs(int(s)) for s in data]