Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zombie fix (Refresh command 0x12) #491

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions custom_components/localtuya/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,43 @@ async def _make_connection(self):
)
self._interface.add_dps_to_request(self.dps_to_request)

self.debug("Retrieving initial state")
status = await self._interface.status()
if status is None:
raise Exception("Failed to retrieve status")

self.status_updated(status)
except Exception: # pylint: disable=broad-except
self.exception(f"Connect to {self._config_entry[CONF_HOST]} failed")
if self._interface is not None:
await self._interface.close()
self._interface = None

if self._interface is not None:
try:
self.debug("Retrieving initial state")
status = await self._interface.status()
if status is None or not status:
raise Exception("Failed to retrieve status")

self._interface.start_heartbeat()
self.status_updated(status)

except Exception: # pylint: disable=broad-except
try:
if self._interface is not None:
self.debug("Initial state update failed, trying refresh command")
await self._interface.refresh()

self.debug("Refresh completed, retrying initial state")
status = await self._interface.status()
if status is None or not status:
raise Exception("Failed to retrieve status")

self._interface.start_heartbeat()
self.status_updated(status)
else:
raise Exception("Interface went away")
except Exception: # pylint: disable=broad-except
self.exception(f"Refresh/initial status update of {self._config_entry[CONF_HOST]} failed")
if self._interface is not None:
await self._interface.close()
self._interface = None

self._connect_task = None

async def close(self):
Expand Down Expand Up @@ -194,10 +220,14 @@ async def set_dps(self, states):
@callback
def status_updated(self, status):
"""Device updated status."""
self._status.update(status)
if status:
self._status.update(status)

signal = f"localtuya_{self._config_entry[CONF_DEVICE_ID]}"
async_dispatcher_send(self._hass, signal, self._status)
else:
self.debug("Received empty status update (likely for refresh)")

signal = f"localtuya_{self._config_entry[CONF_DEVICE_ID]}"
async_dispatcher_send(self._hass, signal, self._status)

@callback
def disconnected(self):
Expand Down
64 changes: 50 additions & 14 deletions custom_components/localtuya/pytuya/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
SET = "set"
STATUS = "status"
HEARTBEAT = "heartbeat"
REFRESH = "refresh"

PROTOCOL_VERSION_BYTES_31 = b"3.1"
PROTOCOL_VERSION_BYTES_33 = b"3.3"
Expand Down Expand Up @@ -90,14 +91,18 @@
STATUS: {"hexByte": 0x0A, "command": {"gwId": "", "devId": ""}},
SET: {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}},
HEARTBEAT: {"hexByte": 0x09, "command": {}},
REFRESH: {"hexByte": 0x12, "command": {"gwId": "", "devId": "", "uid": "", "t": "", "dpId": ""}},
},
"type_0d": {
STATUS: {"hexByte": 0x0D, "command": {"devId": "", "uid": "", "t": ""}},
SET: {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}},
HEARTBEAT: {"hexByte": 0x09, "command": {}},
REFRESH: {"hexByte": 0x12, "command": {"gwId": "", "devId": "", "uid": "", "t": "", "dpId": ""}},
},
}

REFRESH_IDS = [4, 5, 6, 18, 19, 20]


class TuyaLoggingAdapter(logging.LoggerAdapter):
"""Adapter that adds device id to all log points."""
Expand Down Expand Up @@ -207,9 +212,10 @@ def _unpad(data):
class MessageDispatcher(ContextualLogger):
"""Buffer and dispatcher for Tuya messages."""

# Heartbeats always respond with sequence number 0, so they can't be waited for like
# other messages. This is a hack to allow waiting for heartbeats.
# Heartbeats and refreshes always respond with sequence number 0, so they can't be waited for like
# other messages. This is a hack to allow waiting for them.
HEARTBEAT_SEQNO = -100
REFRESH_SEQNO = -101

def __init__(self, dev_id, listener):
"""Initialize a new MessageBuffer."""
Expand Down Expand Up @@ -292,9 +298,25 @@ def _dispatch(self, msg):
sem = self.listeners[self.HEARTBEAT_SEQNO]
self.listeners[self.HEARTBEAT_SEQNO] = msg
sem.release()
elif msg.cmd == 0x12:
self.debug("Got normal refresh response")
if self.REFRESH_SEQNO in self.listeners:
sem = self.listeners[self.REFRESH_SEQNO]
self.listeners[self.REFRESH_SEQNO] = msg
if isinstance(sem, asyncio.Semaphore):
sem.release()
elif msg.cmd == 0x08:
self.debug("Got status update")
self.listener(msg)
# If we have an open refresh call then this is for it.
# Some devices send 0x12 and 0x08 in response to a refresh.
# Empty DPS responses here are always for refresh but hey we haven't decoded yet to know
if self.REFRESH_SEQNO in self.listeners and isinstance(self.listeners[self.REFRESH_SEQNO], asyncio.Semaphore):
self.debug("Got status type refresh response")
sem = self.listeners[self.REFRESH_SEQNO]
self.listeners[self.REFRESH_SEQNO] = msg
sem.release()
else:
self.debug("Got status update")
self.listener(msg)
else:
self.debug(
"Got message type %d for unknown listener %d: %s",
Expand Down Expand Up @@ -373,6 +395,10 @@ def _status_update(msg):
def connection_made(self, transport):
"""Did connect to the device."""

self.transport = transport
self.on_connected.set_result(True)

def start_heartbeat(self):
async def heartbeat_loop():
"""Continuously send heart beat updates."""
self.debug("Started heartbeat loop")
Expand All @@ -394,8 +420,6 @@ async def heartbeat_loop():
self.transport = None
transport.close()

self.transport = transport
self.on_connected.set_result(True)
self.heartbeater = self.loop.create_task(heartbeat_loop())

def data_received(self, data):
Expand Down Expand Up @@ -440,13 +464,14 @@ async def exchange(self, command, dps=None):
payload = self._generate_payload(command, dps)
dev_type = self.dev_type

# Wait for special sequence number if heartbeat
seqno = (
MessageDispatcher.HEARTBEAT_SEQNO
if command == HEARTBEAT
else (self.seqno - 1)
)

# Wait for special sequence number if heartbeat or refresh
seqno = (self.seqno - 1)

if command == HEARTBEAT:
seqno = MessageDispatcher.HEARTBEAT_SEQNO
elif command == REFRESH:
seqno = MessageDispatcher.REFRESH_SEQNO

self.transport.write(payload)
msg = await self.dispatcher.wait_for(seqno)
if msg is None:
Expand Down Expand Up @@ -478,6 +503,15 @@ async def heartbeat(self):
"""Send a heartbeat message."""
return await self.exchange(HEARTBEAT)

async def refresh(self):
"""Send a refresh message (3.3 only)."""
if self.version == 3.3:
self.dev_type = "type_0a"
self.debug("refresh switching to dev_type %s", self.dev_type)
return await self.exchange(REFRESH)
else:
return True

async def set_dp(self, value, dp_index):
"""
Set value (may be any type: bool, int or string) of any dps index.
Expand Down Expand Up @@ -580,6 +614,8 @@ def _generate_payload(self, command, data=None):
json_data["uid"] = self.id # still use id, no separate uid
if "t" in json_data:
json_data["t"] = str(int(time.time()))
if "dpId" in json_data:
json_data["dpId"] = REFRESH_IDS

if data is not None:
json_data["dps"] = data
Expand All @@ -591,7 +627,7 @@ def _generate_payload(self, command, data=None):

if self.version == 3.3:
payload = self.cipher.encrypt(payload, False)
if command_hb != 0x0A:
if command_hb != 0x0A and command_hb != 0x12:
# add the 3.3 header
payload = PROTOCOL_33_HEADER + payload
elif command == SET:
Expand Down