Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Home-Assistant: Linking multiple instances via Websocket API #13876

Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ omit =
homeassistant/components/raspihats.py
homeassistant/components/*/raspihats.py

homeassistant/components/remote_homeassistant.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope.

This needs like a 1000 tests added.

homeassistant/components/*/remote_homeassistant.py

homeassistant/components/rfxtrx.py
homeassistant/components/*/rfxtrx.py

Expand Down
2 changes: 1 addition & 1 deletion homeassistant/components/media_player/webostv.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from homeassistant.helpers.script import Script
import homeassistant.util as util

REQUIREMENTS = ['pylgtv==0.1.7', 'websockets==3.2']
REQUIREMENTS = ['pylgtv==0.1.7', 'websockets==4.0.1']

_CONFIGURING = {} # type: Dict[str, str]
_LOGGER = logging.getLogger(__name__)
Expand Down
273 changes: 273 additions & 0 deletions homeassistant/components/remote_homeassistant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
"""
Connect two Home Assistant instances via the Websocket API.

For more details about this component, please refer to the documentation at
https://home-assistant.io/components/remote_homeassistant/
"""

import logging
import json
import copy
import asyncio

import voluptuous as vol

import homeassistant.components.websocket_api as api
from homeassistant.core import EventOrigin, split_entity_id
from homeassistant.helpers.typing import HomeAssistantType, ConfigType
from homeassistant.const import (CONF_HOST, CONF_PORT, EVENT_CALL_SERVICE,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'homeassistant.const.CONF_URL' imported but unused

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'homeassistant.const.CONF_URL' imported but unused

EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED, EVENT_SERVICE_REGISTERED)
from homeassistant.remote import JSONEncoder
from homeassistant.config import DATA_CUSTOMIZE
import homeassistant.helpers.config_validation as cv

REQUIREMENTS = ['websockets==4.0.1']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use aiohttp to make the websocket connection as it's already part of the core.


_LOGGER = logging.getLogger(__name__)

CONF_SLAVES = 'slaves'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides slaves being a politically loaded word, it's also the wrong description. We are not delegating work to them, we're integrating them into our instance. I think that we should just call the entry instances

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agree, fixed

CONF_SECURE = 'secure'
CONF_API_PASSWORD = 'api_password'
CONF_SUBSCRIBE_EVENTS = 'subscribe_events'
CONF_ENTITY_PREFIX = 'entity_prefix'

DOMAIN = 'remote_homeassistant'

DEFAULT_SUBSCRIBED_EVENTS = [EVENT_STATE_CHANGED,
EVENT_SERVICE_REGISTERED]
DEFAULT_ENTITY_PREFIX = ''

SLAVES_SCHEMA = vol.Schema({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=8123): cv.port,
vol.Optional(CONF_SECURE, default=False): cv.boolean,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make more sense to have the user just define a url like https://blabla.duckdns.org:1220

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wss://blabla.duckdns.org:1220 .. hopefully users won't confuse ws/wss with http/https?

vol.Optional(CONF_API_PASSWORD): cv.string,
vol.Optional(CONF_SUBSCRIBE_EVENTS,
default=DEFAULT_SUBSCRIBED_EVENTS): cv.ensure_list,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vol.All(cv.ensure_list, [cv.string])

vol.Optional(CONF_ENTITY_PREFIX, default=DEFAULT_ENTITY_PREFIX): cv.string,
})

CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Required(CONF_SLAVES): [SLAVES_SCHEMA],
}),
}, extra=vol.ALLOW_EXTRA)


async def async_setup(hass: HomeAssistantType, config: ConfigType):
"""Set up the remote_homeassistant component."""
conf = config.get(DOMAIN)

for slave in conf.get(CONF_SLAVES):
connection = RemoteConnection(hass, slave)
asyncio.ensure_future(connection.async_connect())

return True


class RemoteConnection(object):
"""A Websocket connection to a remote home-assistant instance."""

def __init__(self, hass, conf):
"""Initialize the connection."""
self._hass = hass
self._host = conf.get(CONF_HOST)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._host = conf[CONF_HOST]

self._port = conf.get(CONF_PORT)
self._password = conf.get(CONF_API_PASSWORD)
self._secure = conf.get(CONF_SECURE)
self._subscribe_events = conf.get(CONF_SUBSCRIBE_EVENTS)
self._entity_prefix = conf.get(CONF_ENTITY_PREFIX)

self._connection = None
self._entities = set()
self._handlers = {}

self.__id = 1

async def async_connect(self):
"""Connect to remote home-assistant websocket..."""
import websockets

url = '%s://%s:%s/api/websocket' % (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new style string formatting: https://pyformat.info/.

Maybe break out url string into constant?

'wss' if self._secure else 'ws', self._host, self._port)

while True:
try:
_LOGGER.info('Connecting to %s', url)
self._connection = await websockets.connect(url)
except ConnectionError:
_LOGGER.error(
'Could not connect to %s, retry in 10 seconds...', url)
await asyncio.sleep(10)
else:
_LOGGER.info(
'Connected to home-assistant websocket at %s', url)
break

def stop():
"""Close connection."""
if self._connection.connected:
self._connection.close()

self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop)

asyncio.ensure_future(self._recv())

def _next_id(self):
_id = self.__id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring.

self.__id += 1
return _id

async def _call(self, callback, message_type, **extra_args):
import websockets

_id = self._next_id()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring.

self._handlers[_id] = callback
try:
await self._connection.send(json.dumps(
{'id': _id, 'type': message_type, **extra_args}))
except websockets.exceptions.ConnectionClosed:
_LOGGER.error('remote websocket connection closed')
await self._disconnected()
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary return


async def _disconnected(self):
# Remove all published entries
for entity in self._entities:
self._hass.states.async_remove(entity)
self._entities = set()
asyncio.ensure_future(self.async_connect())

async def _recv(self):
import websockets

while True:
try:
data = await self._connection.recv()
except websockets.exceptions.ConnectionClosed:
_LOGGER.error('remote websocket connection closed')
await self._disconnected()
return

if not data:
return

message = json.loads(data)
_LOGGER.debug('received: %s', message)

if message['type'] == api.TYPE_AUTH_OK:
await self._init()

elif message['type'] == api.TYPE_AUTH_REQUIRED:
if not self._password:
_LOGGER.error('Password required, but not provided')
return
data = {'type': api.TYPE_AUTH, 'api_password': self._password}
await self._connection.send(json.dumps(data, cls=JSONEncoder))

elif message['type'] == api.TYPE_AUTH_INVALID:
_LOGGER.error('Auth invalid, check your API password')
await self._connection.close()
return

else:
callback = self._handlers.get(message['id'])
if callback is not None:
callback(message)

async def _init(self):
async def forward_event(event):
"""Send local event to remote instance.

The affected entity_id has to origin from that remote instance,
otherwise the event is dicarded.
"""
event_data = event.data
service_data = event_data['service_data']
entity_ids = service_data.get('entity_id', None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is the default returned value if the key is missing when using dict.get.


if not entity_ids:
return

if isinstance(entity_ids, str):
entity_ids = (entity_ids.lower(),)

entity_ids = self._entities.intersection(entity_ids)

if not entity_ids:
return

if self._entity_prefix:
def _remove_prefix(entity_id):
domain, object_id = split_entity_id(entity_id)
object_id = object_id.replace(self._entity_prefix, '', 1)
return domain + '.' + object_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new style string formatting.

entity_ids = {_remove_prefix(entity_id)
for entity_id in entity_ids}

event_data = copy.deepcopy(event_data)
event_data['service_data']['entity_id'] = list(entity_ids)

# Remove service_call_id parameter - websocket API
# doesn't accept that one
event_data.pop('service_call_id', None)

_id = self._next_id()
data = {
'id': _id,
'type': event.event_type,
**event_data
}

_LOGGER.debug('forward event: %s', data)

await self._connection.send(json.dumps(data, cls=JSONEncoder))

def state_changed(entity_id, state, attr):
"""Publish remote state change on local instance."""
if self._entity_prefix:
domain, object_id = split_entity_id(entity_id)
object_id = self._entity_prefix + object_id
entity_id = domain + '.' + object_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new style string formatting.


# Add local customization data
if DATA_CUSTOMIZE in self._hass.data:
attr.update(self._hass.data[DATA_CUSTOMIZE].get(entity_id))

self._entities.add(entity_id)
self._hass.states.async_set(entity_id, state, attr)

def fire_event(message):
"""Publish remove event on local instance."""
if message['type'] == 'result':
pass
elif message['type'] == 'event':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use guard clauses to make the code more readable.

elif message['type'] != 'event':
    return

if message

if message['event']['event_type'] == 'state_changed':
entity_id = message['event']['data']['entity_id']
state = message['event']['data']['new_state']['state']
attr = message['event']['data']['new_state']['attributes']
state_changed(entity_id, state, attr)
else:
event = message['event']
self._hass.bus.async_fire(
event_type=event['event_type'],
event_data=event['data'],
origin=EventOrigin.remote
)

def got_states(message):
"""Called when list of remote states is available."""
for entity in message['result']:
entity_id = entity['entity_id']
state = entity['state']
attributes = entity['attributes']

state_changed(entity_id, state, attributes)

self._hass.bus.async_listen(EVENT_CALL_SERVICE, forward_event)

for event in self._subscribe_events:
await self._call(fire_event, 'subscribe_events', event_type=event)

await self._call(got_states, 'get_states')
2 changes: 1 addition & 1 deletion homeassistant/components/spc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv

REQUIREMENTS = ['websockets==3.2']
REQUIREMENTS = ['websockets==4.0.1']

_LOGGER = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1313,9 +1313,10 @@ waterfurnace==0.4.0
# homeassistant.components.media_player.gpmdp
websocket-client==0.37.0

# homeassistant.components.remote_homeassistant
# homeassistant.components.spc
# homeassistant.components.media_player.webostv
websockets==3.2
websockets==4.0.1

# homeassistant.components.zigbee
xbee-helper==0.0.7
Expand Down