Skip to content

Commit

Permalink
Improve yandex_station.send_command service
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Oct 13, 2024
1 parent 57fb32c commit bd9dbde
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 63 deletions.
61 changes: 26 additions & 35 deletions custom_components/yandex_station/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import logging

import voluptuous as vol
Expand Down Expand Up @@ -42,7 +41,7 @@
from .core.yandex_glagol import YandexIOListener
from .core.yandex_quasar import YandexQuasar
from .core.yandex_session import YandexSession
from .core.yandex_station import YandexStation
from .core.yandex_station import YandexStationBase
from .hass import hass_utils

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -197,7 +196,7 @@ async def _init_local_discovery(hass: HomeAssistant):
async def found_local_speaker(info: dict):
speaker = speakers.setdefault(info["device_id"], {})
speaker.update(info)
entity: YandexStation = speaker.get("entity")
entity: YandexStationBase = speaker.get("entity")
if entity and entity.hass:
await entity.init_local_mode()
entity.async_write_ha_state()
Expand All @@ -216,39 +215,31 @@ async def _init_services(hass: HomeAssistant):
"""Init Yandex Station TTS service."""
speakers: dict = hass.data[DOMAIN][DATA_SPEAKERS]

async def send_command(call: ServiceCall):
data = dict(call.data)

device = data.pop("device", None)
entity_ids = data.pop(ATTR_ENTITY_ID, None) or utils.find_station(
speakers.values(), device
)

_LOGGER.debug(f"Send command to: {entity_ids}")

if not entity_ids:
_LOGGER.error("Entity_id parameter required")
return

data = (
{
ATTR_ENTITY_ID: entity_ids,
ATTR_MEDIA_CONTENT_ID: data.get("text"),
ATTR_MEDIA_CONTENT_TYPE: "dialog",
}
if data.get("command") == "dialog"
else {
ATTR_ENTITY_ID: entity_ids,
ATTR_MEDIA_CONTENT_ID: json.dumps(data),
ATTR_MEDIA_CONTENT_TYPE: "json",
}
)

await hass.services.async_call(
MEDIA_DOMAIN, SERVICE_PLAY_MEDIA, data, blocking=True
try:
# starting from Home Assistant 2023.7
from homeassistant.core import ServiceResponse, SupportsResponse
from homeassistant.helpers import service

async def send_command(call: ServiceCall) -> ServiceResponse:
selected = service.async_extract_referenced_entity_ids(hass, call)
entity_ids = selected.referenced | selected.indirectly_referenced
for speaker in speakers.values():
entity: YandexStationBase = speaker.get("entity")
if not entity or entity.entity_id not in entity_ids or not entity.glagol:
continue
data = service.remove_entity_service_fields(call)
data.setdefault("command", "sendText")
return await entity.glagol.send(data)
return {"error": "Entity not found"}

hass.services.async_register(
DOMAIN,
"send_command",
send_command,
supports_response=SupportsResponse.OPTIONAL,
)

hass.services.async_register(DOMAIN, "send_command", send_command)
except ImportError as e:
_LOGGER.warning(repr(e))

async def yandex_station_say(call: ServiceCall):
entity_ids = call.data.get(ATTR_ENTITY_ID) or utils.find_station(
Expand Down
57 changes: 29 additions & 28 deletions custom_components/yandex_station/core/yandex_glagol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from asyncio import Future
from typing import Callable, Dict, Optional

from aiohttp import ClientConnectorError, ClientWebSocketResponse, WSMsgType
from aiohttp import ClientConnectorError, ClientWebSocketResponse
from zeroconf import ServiceBrowser, ServiceStateChange, Zeroconf

from .yandex_session import YandexSession
Expand Down Expand Up @@ -99,16 +99,15 @@ async def _connect(self, fails: int):
# self.keep_task = self.loop.create_task(self._keep_connection())

async for msg in self.ws:
if msg.type == WSMsgType.TEXT:
# _LOGGER.debug("update")
# Большая станция в режиме idle шлёт статус раз в 5 секунд,
# в режиме playing шлёт чаще раза в 1 секунду
# self.next_ping_ts = time.time() + 6

# Большая станция в режиме idle шлёт статус раз в 5 секунд,
# в режиме playing шлёт чаще раза в 1 секунду
# self.next_ping_ts = time.time() + 6
data = json.loads(msg.data)

data = json.loads(msg.data)

response = None
request_id = data.get("requestId")
if request_id in self.waiters:
response = {"status": data["status"]}

if resp := data.get("vinsResponse"):
try:
Expand All @@ -117,40 +116,40 @@ async def _connect(self, fails: int):
resp = resp["payload"]

if card := resp["response"].get("card"):
response = card
response.update(card)
elif cards := resp["response"].get("cards"):
response = cards[0]
response.update(cards[0])
elif resp["response"].get("is_streaming"):
response["is_streaming"] = True
else:
response = resp["voice_response"]["output_speech"]
response.update(resp["voice_response"]["output_speech"])

except Exception as e:
_LOGGER.debug(f"Response error: {e}")

request_id = data.get("requestId")
if request_id in self.waiters:
self.waiters[request_id].set_result(response)
self.waiters[request_id].set_result(response)

self.update_handler(data)
self.update_handler(data)

# TODO: find better place
self.device_token = None

except ClientConnectorError as e:
self.debug(f"Ошибка подключения: {e.args}")
except (ClientConnectorError, ConnectionResetError) as e:
self.debug(f"Ошибка подключения: {repr(e)}")
fails += 1

except (asyncio.CancelledError, RuntimeError) as e:
# сюда попадаем при остановке HA
if isinstance(e, RuntimeError):
assert e.args[0] == "Session is closed", e.args
assert e.args[0] == "Session is closed", repr(e)

self.debug(f"Останавливаем подключение: {e}")
self.debug(f"Останавливаем подключение: {repr(e)}")
if self.ws and not self.ws.closed:
await self.ws.close()
return

except:
_LOGGER.exception(f"{self.name} | Station connect")
except Exception as e:
_LOGGER.error(f"{self.name} => local | {repr(e)}")
fails += 1

# возвращаемся в облачный режим
Expand All @@ -161,10 +160,10 @@ async def _connect(self, fails: int):
return

if fails:
# 30s, 60s, ... 5 min
timeout = 30 * min(fails, 10)
self.debug(f"Таймаут до следующего подключения {timeout}")
await asyncio.sleep(timeout)
# 0s, 30s, 60s, ... 5 min
delay = 30 * min(fails - 1, 10)
self.debug(f"Таймаут до следующего подключения {delay}")
await asyncio.sleep(delay)

_ = asyncio.create_task(self._connect(fails))

Expand Down Expand Up @@ -213,11 +212,13 @@ async def send(self, payload: dict) -> Optional[dict]:

return self.waiters.pop(request_id).result()

except asyncio.TimeoutError:
except asyncio.TimeoutError as e:
_ = self.waiters.pop(request_id, None)
return {"error": repr(e)}

except Exception as e:
_LOGGER.error(e)
_LOGGER.error(f"{self.name} => local | {repr(e)}")
return {"error": repr(e)}

async def reset_session(self):
payload = {
Expand Down

0 comments on commit bd9dbde

Please sign in to comment.