Skip to content

Commit

Permalink
supress http errors to not break device initialization flow (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
maciej-or authored Apr 13, 2024
1 parent 56d6ecf commit 4cec164
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 123 deletions.
4 changes: 2 additions & 2 deletions custom_components/hikvision_next/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import asyncio
import contextlib
from contextlib import suppress
import logging

from httpx import TimeoutException
Expand Down Expand Up @@ -112,7 +112,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# Reset alarm server after it has been set
if config[DATA_SET_ALARM_SERVER]:
isapi = config[DATA_ISAPI]
with contextlib.contextlib(Exception):
with suppress(Exception):
await isapi.set_alarm_server("http://0.0.0.0:80", "/")

if unload_ok:
Expand Down
199 changes: 78 additions & 121 deletions custom_components/hikvision_next/isapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import asyncio
from contextlib import suppress
from dataclasses import dataclass, field
import datetime
from functools import reduce
Expand Down Expand Up @@ -184,8 +185,7 @@ def __init__(self, host: str, username: str, password: str) -> None:

async def get_device_info(self):
"""Get device info."""
hw_info = (await self.isapi.System.deviceInfo(method=GET)).get("DeviceInfo", {})
_LOGGER.debug("%s/ISAPI/System/deviceInfo %s", self.isapi.host, hw_info)
hw_info = (await self.request(GET, "System/deviceInfo")).get("DeviceInfo", {})
self.device_info = HikDeviceInfo(
name=hw_info.get("deviceName"),
manufacturer=str(hw_info.get("manufacturer", "Hikvision")).title(),
Expand All @@ -203,8 +203,7 @@ async def get_hardware_info(self):
await self.get_device_info()

# Get device capabilities
capabilities = (await self.isapi.System.capabilities(method=GET)).get("DeviceCap", {})
_LOGGER.debug("%s/ISAPI/System/capabilities %s", self.isapi.host, capabilities)
capabilities = (await self.request(GET, "System/capabilities")).get("DeviceCap", {})

# Get all supported events to reduce isapi queries
self.supported_events = await self.get_supported_events_info()
Expand All @@ -218,7 +217,8 @@ async def get_hardware_info(self):
self.device_info.input_ports = int(deep_get(capabilities, "SysCap.IOCap.IOInputPortNums", 0))
self.device_info.output_ports = int(deep_get(capabilities, "SysCap.IOCap.IOOutputPortNums", 0))

self.device_info.storage = await self.get_storage_devices()
with suppress(Exception):
self.device_info.storage = await self.get_storage_devices()
self.device_info.supported_events = await self.get_device_event_capabilities(
self.supported_events, self.device_info.serial_no, 0
)
Expand Down Expand Up @@ -258,21 +258,14 @@ async def get_cameras(self):
else:
# Get analog and digital cameras attached to NVR
if self.device_info.support_digital_cameras > 0:
digital_cameras = (
(await self.isapi.ContentMgmt.InputProxy.channels(method=GET))
.get("InputProxyChannelList", {})
.get("InputProxyChannel", [])
digital_cameras = deep_get(
(await self.request(GET, "ContentMgmt/InputProxy/channels")),
"InputProxyChannelList.InputProxyChannel", []
)

if not isinstance(digital_cameras, list):
digital_cameras = [digital_cameras]

_LOGGER.debug(
"%s/ISAPI/ContentMgmt/InputProxy/channels %s",
self.isapi.host,
digital_cameras,
)

for digital_camera in digital_cameras:
camera_id = digital_camera.get("id")
source = digital_camera.get("sourceInputPortDescriptor")
Expand Down Expand Up @@ -309,21 +302,14 @@ async def get_cameras(self):

# Get analog cameras
if self.device_info.support_analog_cameras > 0:
analog_cameras = (
(await self.isapi.System.Video.inputs.channels(method=GET))
.get("VideoInputChannelList", {})
.get("VideoInputChannel", [])
analog_cameras = deep_get(
(await self.request(GET, "System/Video/inputs/channels")),
"VideoInputChannelList.VideoInputChannel", []
)

if not isinstance(analog_cameras, list):
analog_cameras = [analog_cameras]

_LOGGER.debug(
"%s/ISAPI/System/Video/inputs %s",
self.isapi.host,
analog_cameras,
)

for analog_camera in analog_cameras:
camera_id = analog_camera.get("id")
device_serial_no = f"{self.device_info.serial_no}-VI{camera_id}"
Expand All @@ -350,25 +336,16 @@ async def get_cameras(self):

async def get_protocols(self):
"""Get protocols and ports."""
try:
protocols = deep_get(
await self.isapi.Security.adminAccesses(method=GET),
"AdminAccessProtocolList.AdminAccessProtocol",
[],
)
_LOGGER.debug(
"%s/ISAPI/Security/adminAccesses %s",
self.isapi.host,
protocols,
)

for item in protocols:
if item.get("protocol") == "RTSP" and item.get("portNo"):
self.device_info.rtsp_port = item.get("portNo")
break
protocols = deep_get(
await self.request(GET, "Security/adminAccesses"),
"AdminAccessProtocolList.AdminAccessProtocol",
[],
)

except HTTPStatusError:
pass
for item in protocols:
if item.get("protocol") == "RTSP" and item.get("portNo"):
self.device_info.rtsp_port = item.get("portNo")
break

async def get_device_event_capabilities(
self,
Expand Down Expand Up @@ -410,13 +387,12 @@ async def get_device_event_capabilities(
async def get_supported_events_info(self):
"""Get list of all supported events available."""
events = []
event_triggers = await self.isapi.Event.triggers(method=GET)
event_triggers = await self.request(GET, "Event/triggers")
event_notification = event_triggers.get("EventNotification")
_LOGGER.debug("%s/ISAPI/Event/triggers %s", self.isapi.host, event_triggers)
if event_notification:
supported_events = deep_get(event_notification, "EventTriggerList.EventTrigger")
supported_events = deep_get(event_notification, "EventTriggerList.EventTrigger", [])
else:
supported_events = deep_get(event_triggers, "EventTriggerList.EventTrigger")
supported_events = deep_get(event_triggers, "EventTriggerList.EventTrigger", [])

for support_event in supported_events:
notifications = support_event.get("EventTriggerNotificationList", {})
Expand Down Expand Up @@ -479,31 +455,23 @@ async def get_camera_streams(self, channel_id: int) -> list[CameraStreamInfo]:
"""Get stream info for all cameras."""
streams = []
for stream_type_id, stream_type in STREAM_TYPE.items():
try:
stream_id = f"{channel_id}0{stream_type_id}"
stream_info = (await self.isapi.Streaming.channels[stream_id](method=GET)).get("StreamingChannel")
_LOGGER.debug(
"%s/ISAPI/Streaming/channels/%s %s",
self.isapi.host,
stream_id,
stream_info,
)
streams.append(
CameraStreamInfo(
id=int(stream_info["id"]),
name=stream_info["channelName"],
type_id=stream_type_id,
type=stream_type,
enabled=stream_info["enabled"],
codec=stream_info["Video"]["videoCodecType"],
width=stream_info["Video"]["videoResolutionWidth"],
height=stream_info["Video"]["videoResolutionHeight"],
audio=deep_get(stream_info, "Audio.enabled", False),
)
)
except HTTPStatusError:
# If http 400 then does not support this stream type
stream_id = f"{channel_id}0{stream_type_id}"
stream_info = (await self.request(GET, f"Streaming/channels/{stream_id}")).get("StreamingChannel")
if not stream_info:
continue
streams.append(
CameraStreamInfo(
id=int(stream_info["id"]),
name=stream_info["channelName"],
type_id=stream_type_id,
type=stream_type,
enabled=stream_info["enabled"],
codec=deep_get(stream_info, "Video.videoCodecType"),
width=deep_get(stream_info, "Video.videoResolutionWidth", 0),
height=deep_get(stream_info, "Video.videoResolutionHeight", 0),
audio=deep_get(stream_info, "Audio.enabled", False),
)
)
return streams

def get_camera_by_id(self, camera_id: int) -> IPCamera | AnalogCamera | None:
Expand All @@ -517,13 +485,15 @@ def get_camera_by_id(self, camera_id: int) -> IPCamera | AnalogCamera | None:
async def get_storage_devices(self):
"""Get HDD storage devices."""
storage_list = []
storage_info = (await self.isapi.ContentMgmt.Storage(method=GET)).get("storage", {}).get("hddList", {})
storage_info = (
(await self.request(GET, "ContentMgmt/Storage", ignore_exception=False))
.get("storage", {})
.get("hddList", {})
)

if not isinstance(storage_info, list):
storage_info = [storage_info]

_LOGGER.debug("%s/ISAPI/ContentMgmt/Storage %s", self.isapi.host, storage_info)

if "hdd" not in storage_info:
return storage_list

Expand Down Expand Up @@ -595,12 +565,11 @@ def get_event_state_node(self, event: EventInfo) -> str:
if camera.connection_type == CONNECTION_TYPE_PROXIED and EVENTS[event.id].get("proxied_node"):
slug = EVENTS[event.id]["proxied_node"]

node = slug[0].upper() + slug[1:]
return node
return slug[0].upper() + slug[1:]

async def get_event_enabled_state(self, event: EventInfo) -> bool:
"""Get event detection state."""
state = await self.request(GET, event.url)
state = await self.request(GET, event.url, ignore_exception=False)
node = self.get_event_state_node(event)
return str_to_bool(state[node].get("enabled", False)) if state.get(node) else False

Expand Down Expand Up @@ -666,24 +635,10 @@ async def set_event_enabled_state(self, channel_id: int, event: EventInfo, is_en
async def get_port_status(self, port_type: str, port_no: int) -> str:
"""Get status of physical ports."""
if port_type == "input":
status = await self.isapi.System.IO.inputs[port_no].status(method=GET)
_LOGGER.debug(
"%s/ISAPI/System/IO/inputs/%s/status %s",
self.isapi.host,
port_no,
status,
)
status = await self.request(GET, f"System/IO/inputs/{port_no}/status", ignore_exception=False)
else:
status = await self.isapi.System.IO.outputs[port_no].status(method=GET)
_LOGGER.debug(
"%s/ISAPI/System/IO/outputs/%s/status %s",
self.isapi.host,
port_no,
status,
)

if status.get("IOPortStatus"):
return status["IOPortStatus"].get("ioState")
status = await self.request(GET, f"System/IO/outputs/{port_no}/status", ignore_exception=False)
return deep_get(status, "IOStatus.ioState")

async def set_port_state(self, port_no: int, turn_on: bool):
"""Set status of output port."""
Expand All @@ -705,15 +660,14 @@ async def set_port_state(self, port_no: int, turn_on: bool):
async def get_holiday_enabled_state(self, holiday_index=0) -> bool:
"""Get holiday state."""

data = await self.isapi.System.Holidays(method=GET)
data = await self.request(GET, "System/Holidays", ignore_exception=False)
holiday = data["HolidayList"]["holiday"][holiday_index]
return str_to_bool(holiday["enabled"]["#text"])

async def set_holiday_enabled_state(self, is_enabled: bool, holiday_index=0) -> None:
"""Enable or disable holiday, by enable set time span to year starting from today."""

data = await self.isapi.System.Holidays(method=GET)
_LOGGER.debug("%s/ISAPI/System/Holidays %s", self.isapi.host, data)
data = await self.request(GET, "System/Holidays")
holiday = data["HolidayList"]["holiday"][holiday_index]
new_state = bool_to_str(is_enabled)
if new_state == holiday["enabled"]["#text"]:
Expand All @@ -729,8 +683,7 @@ async def set_holiday_enabled_state(self, is_enabled: bool, holiday_index=0) ->
holiday.pop("holidayWeek", None)
holiday.pop("holidayMonth", None)
xml = xmltodict.unparse(data)
response = await self.isapi.System.Holidays(method=PUT, data=xml)
_LOGGER.debug("[PUT] %s/ISAPI/System/Holidays %s", self.isapi.host, response)
await self.request(PUT, "System/Holidays", present="xml", data=xml)

def _get_event_notification_host(self, data: Node) -> Node:
hosts = deep_get(data, "HttpHostNotificationList.HttpHostNotification", {})
Expand All @@ -743,30 +696,23 @@ def _get_event_notification_host(self, data: Node) -> Node:
async def get_alarm_server(self) -> AlarmServer | None:
"""Get event notifications listener server URL."""

try:
data = await self.isapi.Event.notification.httpHosts(method=GET)
except HTTPStatusError:
return None

_LOGGER.debug("%s/ISAPI/Event/notification/httpHosts %s", self.isapi.host, data)

data = await self.request(GET, "Event/notification/httpHosts")
host = self._get_event_notification_host(data)

alarm_server = AlarmServer(
return AlarmServer(
ipAddress=host.get("ipAddress"),
portNo=int(host.get("portNo")),
url=host.get("url"),
protocolType=host.get("protocolType"),
)
_LOGGER.debug("Alarm Server: %s", alarm_server)
return alarm_server

async def set_alarm_server(self, base_url: str, path: str) -> None:
"""Set event notifications listener server."""

address = urlparse(base_url)
data = await self.isapi.Event.notification.httpHosts(method=GET)
_LOGGER.debug("%s/ISAPI/Event/notification/httpHosts %s", self.isapi.host, data)
data = await self.request(GET, "Event/notification/httpHosts")
if not data:
return
host = self._get_event_notification_host(data)
if (
host["protocolType"] == address.scheme.upper()
Expand All @@ -784,21 +730,32 @@ async def set_alarm_server(self, base_url: str, path: str) -> None:
host["httpAuthenticationMethod"] = "none"

xml = xmltodict.unparse(data)
response = await self.isapi.Event.notification.httpHosts(method=PUT, data=xml)
_LOGGER.debug(
"[PUT] %s/ISAPI/Event/notification/httpHosts %s",
self.isapi.host,
response,
)
await self.request(PUT, "Event/notification/httpHosts", present="xml", data=xml)

async def request(self, method: str, url: str, present: str = "dict", **data) -> Any:
"""Send request."""
async def request(
self,
method: str,
url: str,
present: str = "dict",
ignore_exception: bool = True,
**data,
) -> Any:
"""Send request and log response, returns {} if request fails."""

full_url = f"{self.isapi.host}/{self.isapi.isapi_prefix}/{url}"
try:
return await self.isapi.common_request(method, full_url, present, self.isapi.timeout, **data)
response = await self.isapi.common_request(method, full_url, present, self.isapi.timeout, **data)
_LOGGER.debug("--- [%s] %s", method.upper(), full_url)
if data:
_LOGGER.debug(">>> payload:\n%s", data)
_LOGGER.debug("\n%s", response)
except HTTPStatusError as ex:
raise ex
_LOGGER.warning("--- [%s] %s\n%s", method.upper(), full_url, ex)
if ignore_exception:
return {}
raise
else:
return response

def handle_exception(self, ex: Exception, details: str = "") -> bool:
"""Handle common exception, returns False if exception remains unhandled."""
Expand Down

0 comments on commit 4cec164

Please sign in to comment.