Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Local Mode #1230

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion custom_components/sonoff/core/ewelink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def send(
ignored if params empty
:param timeout_lan: optional custom LAN timeout
"""
seq = self.sequence()
seq = await self.sequence()

if "parent" in device:
main_device = device["parent"]
Expand Down
16 changes: 9 additions & 7 deletions custom_components/sonoff/core/ewelink/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ class XDevice(TypedDict, total=False):
class XRegistryBase:
dispatcher: dict[str, list[Callable]] = None
_sequence: int = 0
_sequence_lock: asyncio.Lock = asyncio.Lock()

def __init__(self, session: ClientSession):
self.dispatcher = {}
self.session = session

@staticmethod
def sequence() -> str:
async def sequence() -> str:
"""Return sequnce counter in ms. Always unique."""
t = int(time.time()) * 1000
if t > XRegistryBase._sequence:
XRegistryBase._sequence = t
else:
XRegistryBase._sequence += 1
return str(XRegistryBase._sequence)
t = time.time_ns() // 1_000_000
async with XRegistryBase._sequence_lock:
if t > XRegistryBase._sequence:
XRegistryBase._sequence = t
else:
XRegistryBase._sequence += 1
return str(XRegistryBase._sequence)

def dispatcher_connect(self, signal: str, target: Callable) -> Callable:
targets = self.dispatcher.setdefault(signal, [])
Expand Down
2 changes: 1 addition & 1 deletion custom_components/sonoff/core/ewelink/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def send(
self.last_ts = time.time()

if sequence is None:
sequence = self.sequence()
sequence = await self.sequence()
log += sequence

# https://coolkit-technologies.github.io/eWeLink-API/#/en/APICenterV2?id=websocket-update-device-status
Expand Down
29 changes: 27 additions & 2 deletions custom_components/sonoff/core/ewelink/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import asyncio
import base64
import errno
import ipaddress
import json
import logging
Expand Down Expand Up @@ -162,6 +163,7 @@ async def send(
command: str = None,
sequence: str = None,
timeout: int = 5,
cre_retry_counter: int = 10,
):
# known commands for DIY: switch, startup, pulse, sledonline
# other commands: switch, switches, transmit, dimmable, light, fan
Expand All @@ -172,7 +174,7 @@ async def send(
command = next(iter(params))

payload = {
"sequence": sequence or self.sequence(),
"sequence": sequence or await self.sequence(),
"deviceid": device["deviceid"],
"selfApikey": "123",
"data": params or {},
Expand Down Expand Up @@ -231,8 +233,31 @@ async def send(
_LOGGER.debug(f"{log} !! Can't connect: {e}")
return "E#CON"

except aiohttp.ClientOSError as e:
if e.errno != errno.ECONNRESET:
_LOGGER.debug(log, exc_info=e)
return "E#COE" # ClientOSError

# This happens because the device's web server is not multi-threaded
# and can only process one request at a time. Therefore, if the
# device is busy processing another request, it will close the
# connection for the new request and we will get this error.
#
# It appears that the device takes some time to process a new request
# after the previous one was closed, which caused a locking approach
# to not work across different devices. Simply retrying on this error
# a few times seems to fortunately work reliably, so we'll do that.

_LOGGER.debug(f"{log} !! ConnectionResetError")
if cre_retry_counter > 0:
await asyncio.sleep(0.1)
return await self.send(
device, params, command, sequence, timeout, cre_retry_counter - 1
)

return "E#CRE" # ConnectionResetError

except (
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
asyncio.CancelledError,
) as e:
Expand Down