Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

G2-1689 Fix async forward entry setup #15

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 17 additions & 45 deletions custom_components/intellicenter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Pentair IntelliCenter Integration."""
import asyncio
import logging
from typing import Any, Optional

Expand Down Expand Up @@ -99,72 +98,50 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
SYSTEM_TYPE: {MODE_ATTR, VACFLO_ATTR},
}
model = PoolModel(attributes_map)

controller = ModelController(entry.data[CONF_HOST], model, loop=hass.loop)

class Handler(ConnectionHandler):

UPDATE_SIGNAL = DOMAIN + "_UPDATE_" + entry.entry_id
CONNECTION_SIGNAL = DOMAIN + "_CONNECTION_" + entry.entry_id

def started(self, controller):

async def started(self, controller):
_LOGGER.info(f"connected to system: '{controller.systemInfo.propName}'")

for object in controller.model:
_LOGGER.debug(f" loaded {object}")

async def setup_platforms():
"""Set up platforms."""
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_setup(entry, platform)
for platform in PLATFORMS
]
)

# dispatcher.async_dispatcher_send(hass, self.CONNECTION_SIGNAL, True)

hass.async_create_task(setup_platforms())
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

@callback
def reconnected(self, controller):
async def reconnected(self, controller):
"""Handle reconnection from the Pentair system."""
_LOGGER.info(f"reconnected to system: '{controller.systemInfo.propName}'")
dispatcher.async_dispatcher_send(hass, self.CONNECTION_SIGNAL, True)

@callback
def disconnected(self, controller, exc):
async def disconnected(self, controller, exc):
"""Handle updates from the Pentair system."""
_LOGGER.info(
f"disconnected from system: '{controller.systemInfo.propName}'"
)
dispatcher.async_dispatcher_send(hass, self.CONNECTION_SIGNAL, False)

@callback
def updated(self, controller, updates: dict[str, PoolObject]):
async def updated(self, controller, updates: dict[str, PoolObject]):
"""Handle updates from the Pentair system."""
_LOGGER.debug(f"received update for {len(updates)} pool objects")
dispatcher.async_dispatcher_send(hass, self.UPDATE_SIGNAL, updates)

try:

handler = Handler(controller)

await handler.start()

hass.data.setdefault(DOMAIN, {})

hass.data[DOMAIN][entry.entry_id] = handler

# subscribe to Home Assistant STOP event to do some cleanup

async def on_hass_stop(event):
"""Stop push updates when hass stops."""
handler.stop()

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, on_hass_stop)

return True
except ConnectionRefusedError as err:
raise ConfigEntryNotReady from err
Expand All @@ -174,28 +151,23 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload IntelliCenter config entry."""

# Unload entities for this entry/device.
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

all(
await asyncio.gather(
*[
hass.config_entries.async_forward_entry_unload(entry, platform)
for platform in PLATFORMS
]
)
)

# Cleanup
handler = hass.data[DOMAIN].pop(entry.entry_id, None)
if unload_ok:
# Cleanup
handler = hass.data[DOMAIN].pop(entry.entry_id, None)

_LOGGER.info(f"unloading integration {entry.entry_id}")
if handler:
_LOGGER.info(f"unloading integration {entry.entry_id}")
if handler:
handler.stop()

# if it was the last instance of this integration, clear up the DOMAIN entry
if not hass.data[DOMAIN]:
del hass.data[DOMAIN]
# if it was the last instance of this integration, clear up the DOMAIN entry
if not hass.data[DOMAIN]:
del hass.data[DOMAIN]

return True
return True

return False


# -------------------------------------------------------------------------------------
Expand Down
66 changes: 33 additions & 33 deletions custom_components/intellicenter/pyintellicenter/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __init__(self, host, port=6681, loop=None):
self._transport = None
self._protocol = None

self._diconnectedCallback = None
self._disconnectedCallback = None

self._requests = {}

Expand All @@ -128,11 +128,11 @@ def connection_made(self, protocol, transport):
"""Handle the callback from the protocol."""
_LOGGER.debug(f"Connection established to {self._host}")

def connection_lost(self, exc):
async def connection_lost(self, exc):
"""Handle the callback from the protocol."""
self.stop() # should that be a cleanup instead?
if self._diconnectedCallback:
self._diconnectedCallback(self, exc)
if self._disconnectedCallback:
await self._disconnectedCallback(self, exc)

async def start(self) -> None:
"""Connect to the Pentair system and retrieves some system information."""
Expand Down Expand Up @@ -239,7 +239,7 @@ def getConfiguration(self):
"""Return the current 'configuration' of the system."""
return self.getQuery("GetConfiguration")

def receivedMessage(self, msg_id: str, command: str, response: str, msg: dict):
async def receivedMessage(self, msg_id: str, command: str, response: str, msg: dict):
"""Handle the callback for a incoming message.

msd_id is the id of the incoming message
Expand Down Expand Up @@ -268,11 +268,11 @@ def receivedMessage(self, msg_id: str, command: str, response: str, msg: dict):
else:
_LOGGER.debug(f"ignoring response for msg_id {msg_id}")
elif response is None or response == "200":
self.processMessage(command, msg)
await self.processMessage(command, msg)
else:
_LOGGER.warning(f"CONTROLLER: error {response} : {msg}")

def processMessage(self, command: str, msg):
async def processMessage(self, command: str, msg):
"""Process a notification message."""
pass

Expand Down Expand Up @@ -330,19 +330,19 @@ async def start(self):
# we split them in maximum of 50 attributes (arbitrary but seems to work)
if numAttributes >= 50:
res = await self.sendCmd("RequestParamList", {"objectList": query})
self._applyUpdates(res["objectList"])
await self._applyUpdates(res["objectList"])
query = []
numAttributes = 0
# and issue the remaining elements if any
if query:
res = await self.sendCmd("RequestParamList", {"objectList": query})
self._applyUpdates(res["objectList"])
await self._applyUpdates(res["objectList"])

except Exception as err:
traceback.print_exc()
raise err

def receivedQueryResult(self, queryName: str, answer):
async def receivedQueryResult(self, queryName: str, answer):
"""Handle the result of all 'getQuery' responses."""

# none are used by default
Expand All @@ -351,7 +351,7 @@ def receivedQueryResult(self, queryName: str, answer):

pass

def _applyUpdates(self, changesAsList):
async def _applyUpdates(self, changesAsList):
"""Apply updates received to the model."""

updates = self._model.processUpdates(changesAsList)
Expand All @@ -363,30 +363,30 @@ def _applyUpdates(self, changesAsList):
self._systemInfo.update(updates[systemObjnam])

if updates and self._updatedCallback:
self._updatedCallback(self, updates)
await self._updatedCallback(self, updates)

return updates

def receivedNotifyList(self, changes):
async def receivedNotifyList(self, changes):
"""Handle the notifications from IntelliCenter when tracked objects are modified."""

try:
# apply the changes back to the model
self._applyUpdates(changes)
await self._applyUpdates(changes)

except Exception as err:
_LOGGER.error(f"CONTROLLER: receivedNotifyList {err}")

def receivedWriteParamList(self, changes):
async def receivedWriteParamList(self, changes):
"""Handle the response to a change requested on an object."""

try:
self._applyUpdates(changes)
await self._applyUpdates(changes)

except Exception as err:
_LOGGER.error(f"CONTROLLER: receivedWriteParamList {err}")

def receivedSystemConfig(self, objectList):
async def receivedSystemConfig(self, objectList):
"""Handle the response for a request for objects."""

_LOGGER.debug(
Expand All @@ -396,20 +396,20 @@ def receivedSystemConfig(self, objectList):
# note that here we might create new objects
self.model.addObjects(objectList)

def processMessage(self, command: str, msg):
async def processMessage(self, command: str, msg):
"""Handle the callback for an incoming message."""

_LOGGER.debug(f"CONTROLLER: received {command} response: {msg}")

try:
if command == "SendQuery":
self.receivedQueryResult(msg["queryName"], msg["answer"])
await self.receivedQueryResult(msg["queryName"], msg["answer"])
elif command == "NotifyList":
self.receivedNotifyList(msg["objectList"])
await self.receivedNotifyList(msg["objectList"])
elif command == "WriteParamList":
self.receivedWriteParamList(msg["objectList"][0]["changes"])
await self.receivedWriteParamList(msg["objectList"][0]["changes"])
elif command == "SendParamList":
self.receivedSystemConfig(msg["objectList"])
await self.receivedSystemConfig(msg["objectList"])
else:
_LOGGER.debug(f"no handler for {command}")
except Exception as err:
Expand All @@ -433,7 +433,7 @@ def __init__(self, controller, timeBetweenReconnects=30):

self._timeBetweenReconnects = timeBetweenReconnects

controller._diconnectedCallback = self._diconnectedCallback
controller._disconnectedCallback = self._disconnectedCallback

if hasattr(controller, "_updatedCallback"):
controller._updatedCallback = self.updated
Expand Down Expand Up @@ -469,16 +469,16 @@ async def _starter(self, initialDelay=0):
await self._controller.start()

if self._firstTime:
self.started(self._controller)
await self.started(self._controller)
self._firstTime = False
else:
self.reconnected(self._controller)
await self.reconnected(self._controller)

started = True
self._starterTask = None
except Exception as err:
_LOGGER.error(f"cannot start: {err}")
self.retrying(delay)
await self.retrying(delay)
await asyncio.sleep(delay)
delay = self._next_delay(delay)

Expand All @@ -491,9 +491,9 @@ def stop(self):
self._starterTask = None
self._controller.stop()

def _diconnectedCallback(self, controller, err):
async def _disconnectedCallback(self, controller, err):
"""Handle the disconnection of the underlying controller."""
self.disconnected(controller, err)
await self.disconnected(controller, err)
if not self._stopped:
_LOGGER.error(
f"system disconnected from {self._controller.host} {err if err else ''}"
Expand All @@ -502,34 +502,34 @@ def _diconnectedCallback(self, controller, err):
self._starter(self._timeBetweenReconnects)
)

def started(self, controller):
async def started(self, controller):
"""Handle the first time the controller is started.

further reconnections will trigger reconnected method instead
"""
pass

def retrying(self, delay):
async def retrying(self, delay):
"""Handle the fact that we will retry connection in {delay} seconds."""
_LOGGER.info(f"will attempt to reconnect in {delay}s")

def updated(self, controller: ModelController, updates: dict):
async def updated(self, controller: ModelController, updates: dict):
"""Handle the callback that our underlying system has been modified.

only invoked if the controller has a _updatedCallback attribute
changes is expected to contain the list of modified objects
"""
pass

def disconnected(self, controller, exc):
async def disconnected(self, controller, exc):
"""Handle the controller being disconnected.

exc will contain the underlying exception except if
the hearbeat has been missed, in this case exc is None
"""
pass

def reconnected(self, controller):
async def reconnected(self, controller):
"""Handle the controller being reconnected.

only occurs if the controller was connected before
Expand Down
12 changes: 6 additions & 6 deletions custom_components/intellicenter/pyintellicenter/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ def connection_made(self, transport):
# and notify our controller that we are ready!
self._controller.connection_made(self, transport)

def connection_lost(self, exc):
async def connection_lost(self, exc):
"""Handle the callback for connection lost."""

self._controller.connection_lost(exc)
await self._controller.connection_lost(exc)

def data_received(self, data) -> None:
async def data_received(self, data) -> None:
"""Handle the callback for data received."""

data = data.decode()
Expand All @@ -80,7 +80,7 @@ def data_received(self, data) -> None:
for line in lines:
if line:
# and process each line individually
self.processMessage(line)
await self.processMessage(line)

def sendCmd(self, cmd: str, extra: dict = None) -> str:
"""Send a command and return a generated msg id."""
Expand Down Expand Up @@ -130,7 +130,7 @@ def responseReceived(self) -> None:
if self._out_pending:
self._out_pending -= 1

def processMessage(self, message: str) -> None:
async def processMessage(self, message: str) -> None:
"""Process a given message from IntelliCenter."""

_LOGGER.debug(f"PROTOCOL: processMessage {message}")
Expand Down Expand Up @@ -167,7 +167,7 @@ def processMessage(self, message: str) -> None:
self.responseReceived()

# let's pass our message back to the controller for handling its semantic...
self._controller.receivedMessage(msg_id, command, response, msg)
await self._controller.receivedMessage(msg_id, command, response, msg)

except Exception as err:
_LOGGER.error(f"PROTOCOL: exception while receiving message {err}")
Loading