diff --git a/custom_components/eufy_security/eufy_security_api/camera.py b/custom_components/eufy_security/eufy_security_api/camera.py index eadbdb7..11c250f 100644 --- a/custom_components/eufy_security/eufy_security_api/camera.py +++ b/custom_components/eufy_security/eufy_security_api/camera.py @@ -4,6 +4,7 @@ import logging import threading from base64 import b64decode +from collections import deque import datetime import traceback @@ -52,8 +53,8 @@ def __init__(self, api, serial_no: str, properties: dict, metadata: dict, comman self.stream_provider: StreamProvider = None self.stream_url: str = None - self.video_queue = asyncio.Queue() - self.audio_queue = asyncio.Queue() + self.video_queue = deque() + self.audio_queue = deque() self.config = config self.voices = voices self.image_last_updated = None @@ -86,8 +87,8 @@ async def _handle_livestream_stopped(self, event: Event): # automatically find this function for respective event _LOGGER.debug(f"_handle_livestream_stopped - {event}") self.stream_status = StreamStatus.IDLE - self.video_queue = asyncio.Queue() - self.audio_queue = asyncio.Queue() + self.video_queue = deque() + self.audio_queue = deque() async def _handle_rtsp_livestream_started(self, event: Event): # automatically find this function for respective event @@ -100,11 +101,12 @@ async def _handle_rtsp_livestream_stopped(self, event: Event): self.stream_status = StreamStatus.IDLE async def _handle_livestream_video_data_received(self, event: Event): - await self.video_queue.put(bytearray(event.data["buffer"]["data"])) + #_LOGGER.debug(f"_handle_rtsp_livestream_stopped - {event}") + self.video_queue.append(bytearray(event.data["buffer"]["data"])) async def _handle_livestream_audio_data_received(self, event: Event): - pass - #await self.audio_queue.put(bytearray(event.data["buffer"]["data"])) + #pass + self.audio_queue.append(bytearray(event.data["buffer"]["data"])) async def _initiate_start_stream(self, stream_type) -> bool: self.set_stream_prodiver(stream_type) @@ -135,31 +137,25 @@ async def _initiate_start_stream(self, stream_type) -> bool: async def check_live_stream(self): if self.p2p_streamer.retry is not None: - await self.async_restart_livestream(self.p2p_streamer.retry) + _LOGGER.debug(f"async_restart_livestream - start - {self.p2p_streamer.retry}") + if self.stream_status != StreamStatus.IDLE: + await self.stop_livestream() + if self.p2p_streamer.retry is True: + _LOGGER.debug(f"async_restart_livestream - sleep - {self.p2p_streamer.retry}") + await asyncio.sleep(1) + _LOGGER.debug(f"async_restart_livestream - start live stream finish - {self.p2p_streamer.retry}") + await self.start_livestream() + _LOGGER.debug(f"async_restart_livestream - start live stream end - {self.p2p_streamer.retry}") async def start_livestream(self) -> bool: """Process start p2p livestream call""" + self.stream_future = asyncio.create_task(self.p2p_streamer.start()) if await self._initiate_start_stream(StreamProvider.P2P) is False: return False - - self.stream_future = asyncio.create_task(self.p2p_streamer.start()) self.stream_checker = asyncio.create_task(self.check_live_stream()) - self.stream_status = StreamStatus.STREAMING return True - async def async_restart_livestream(self, retry): - _LOGGER.debug(f"async_restart_livestream - start - {retry}") - if self.stream_status != StreamStatus.IDLE: - await self.stop_livestream() - _LOGGER.debug(f"async_restart_livestream - cont - {retry}") - if retry is True: - _LOGGER.debug(f"async_restart_livestream - sleep 5 seconds - {retry}") - await asyncio.sleep(1) - _LOGGER.debug(f"async_restart_livestream - start live stream finish - {retry}") - await self.start_livestream() - _LOGGER.debug(f"async_restart_livestream - start live stream end - {retry}") - async def stop_livestream(self): """Process stop p2p livestream call""" await self.api.stop_livestream(self.product_type, self.serial_no) diff --git a/custom_components/eufy_security/eufy_security_api/p2p_streamer.py b/custom_components/eufy_security/eufy_security_api/p2p_streamer.py index 2307f38..8604a0f 100644 --- a/custom_components/eufy_security/eufy_security_api/p2p_streamer.py +++ b/custom_components/eufy_security/eufy_security_api/p2p_streamer.py @@ -21,49 +21,60 @@ def __init__(self, camera) -> None: self.camera = camera self.retry = None - async def chunk_generator(self, queue): - retry = 0 - max_retry = 10 - while retry < max_retry: + async def chunk_generator(self, queue, queue_name): + retry_count = 0 + max_retry_count = 10 + try: + await asyncio.wait_for(self.camera.p2p_started_event.wait(), 5) + except asyncio.TimeoutError as te: + _LOGGER.debug(f"chunk_generator {queue_name} - event did not receive in timeout") + raise te + + while retry_count < max_retry_count: try: - item = queue.get_nowait() - _LOGGER.debug(f"chunk_generator yield data {retry} - {len(item)}") - retry = 0 + item = queue.popleft() + _LOGGER.debug(f"chunk_generator {queue_name} yield data {retry_count} - {len(item)}") + retry_count = 0 yield item - except TimeoutError as te: - _LOGGER.debug(f"chunk_generator timeout Exception %s - traceback: %s", te, traceback.format_exc()) - raise te - except asyncio.QueueEmpty as qe: - retry = retry + 1 + except IndexError as qe: + retry_count = retry_count + 1 await asyncio.sleep(0.1) - async def write_bytes(self, queue): + async def write_bytes(self, queue, queue_name): url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT) url = f"{url}?dst={str(self.camera.serial_no)}" - self.retry = False + self.retry = None try: async with aiohttp.ClientSession() as session: - resp = await session.post(url, data = self.chunk_generator(queue), timeout=aiohttp.ClientTimeout(total=None, connect=5)) - _LOGGER.debug(f"write_bytes - post response - {resp.status} - {await resp.text()}") - _LOGGER.debug("write_bytes - post ended - retry") + resp = await session.post(url, data = self.chunk_generator(queue, queue_name), timeout=aiohttp.ClientTimeout(total=None, connect=5)) + _LOGGER.debug(f"write_bytes {queue_name} - post response - {resp.status} - {await resp.text()}") + _LOGGER.debug("write_bytes - post ended - no retry") self.retry = False except (asyncio.exceptions.TimeoutError, asyncio.exceptions.CancelledError) as ex: # live stream probabaly stopped, handle peacefully - _LOGGER.debug(f"write_bytes timeout/cancelled exception %s - traceback: %s", ex, traceback.format_exc()) + _LOGGER.debug(f"write_bytes {queue_name} timeout/cancelled no retry exception {ex} - traceback: {traceback.format_exc()}") self.retry = False except aiohttp.client_exceptions.ServerDisconnectedError as ex: - # connection to go2rtc server is broken, try again - _LOGGER.debug(f"write_bytes server_disconnected exception %s - traceback: %s", ex, traceback.format_exc()) + # connection to go2rtc server is broken, try again`` + _LOGGER.debug(f"write_bytes {queue_name} server_disconnected retry exception {ex} - traceback: {traceback.format_exc()}") self.retry = True except Exception as ex: # pylint: disable=broad-except # other exceptions, log the error - _LOGGER.debug(f"write_bytes general exception %s - traceback: %s", ex, traceback.format_exc()) + _LOGGER.debug(f"write_bytes {queue_name} general exception no retry {ex} - traceback: {traceback.format_exc()}") self.retry = False - _LOGGER.debug("write_bytes - ended") + _LOGGER.debug("write_bytes {queue_name} - ended") async def create_stream_on_go2rtc(self): + parameters = {"name": str(self.camera.serial_no)} + url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT) + url = f"{url}s" + async with aiohttp.ClientSession() as session: + async with session.delete(url, params=parameters) as response: + result = response.status, await response.text() + _LOGGER.debug(f"create_stream_on_go2rtc - delete stream response {result}") + parameters = {"name": str(self.camera.serial_no), "src": str(self.camera.serial_no)} url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT) url = f"{url}s" @@ -76,4 +87,7 @@ async def start(self): """start streaming thread""" # send API command to go2rtc to create a new stream await self.create_stream_on_go2rtc() - await self.write_bytes(self.camera.video_queue) \ No newline at end of file + await asyncio.gather( + self.write_bytes(self.camera.audio_queue, "audio"), + self.write_bytes(self.camera.video_queue, "video") + ) diff --git a/custom_components/eufy_security/eufy_security_api/web_socket_client.py b/custom_components/eufy_security/eufy_security_api/web_socket_client.py index 2c15853..0945d96 100644 --- a/custom_components/eufy_security/eufy_security_api/web_socket_client.py +++ b/custom_components/eufy_security/eufy_security_api/web_socket_client.py @@ -39,7 +39,7 @@ def __init__( async def connect(self): """Set up web socket connection""" try: - self.socket = await self.session.ws_connect(f"ws://{self.host}:{self.port}", autoclose=False, autoping=True, heartbeat=60) + self.socket = await self.session.ws_connect(f"ws://{self.host}:{self.port}", heartbeat=10, compress=9) except Exception as exc: raise WebSocketConnectionException("Connection to add-on was broken. please reload the integration!") from exc self.task = self.loop.create_task(self._process_messages()) diff --git a/custom_components/eufy_security/sensor.py b/custom_components/eufy_security/sensor.py index 18ebe41..f9bafee 100644 --- a/custom_components/eufy_security/sensor.py +++ b/custom_components/eufy_security/sensor.py @@ -54,9 +54,9 @@ def native_value(self): """Return the value reported by the sensor.""" if self.metadata.name in CameraSensor.__members__: if self.metadata.name == CameraSensor.video_queue_size.name: - return self.product.video_queue.qsize() + return len(self.product.video_queue) if self.metadata.name == CameraSensor.audio_queue_size.name: - return self.product.audio_queue.qsize() + return len(self.product.audio_queue) if self.metadata.name == CameraSensor.stream_provider.name: return self.product.stream_provider.name return get_child_value(self.product.__dict__, self.metadata.name)