Skip to content

Commit

Permalink
feat: start using deque for faster process with audio
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatakgun committed Nov 29, 2023
1 parent a5aa613 commit af7eacf
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 49 deletions.
42 changes: 19 additions & 23 deletions custom_components/eufy_security/eufy_security_api/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import threading
from base64 import b64decode
from collections import deque
import datetime
import traceback

Expand Down Expand Up @@ -52,8 +53,8 @@ def __init__(self, api, serial_no: str, properties: dict, metadata: dict, comman
self.stream_provider: StreamProvider = None
self.stream_url: str = None

self.video_queue = asyncio.Queue()
self.audio_queue = asyncio.Queue()
self.video_queue = deque()
self.audio_queue = deque()
self.config = config
self.voices = voices
self.image_last_updated = None
Expand Down Expand Up @@ -86,8 +87,8 @@ async def _handle_livestream_stopped(self, event: Event):
# automatically find this function for respective event
_LOGGER.debug(f"_handle_livestream_stopped - {event}")
self.stream_status = StreamStatus.IDLE
self.video_queue = asyncio.Queue()
self.audio_queue = asyncio.Queue()
self.video_queue = deque()
self.audio_queue = deque()

async def _handle_rtsp_livestream_started(self, event: Event):
# automatically find this function for respective event
Expand All @@ -100,11 +101,12 @@ async def _handle_rtsp_livestream_stopped(self, event: Event):
self.stream_status = StreamStatus.IDLE

async def _handle_livestream_video_data_received(self, event: Event):
await self.video_queue.put(bytearray(event.data["buffer"]["data"]))
#_LOGGER.debug(f"_handle_rtsp_livestream_stopped - {event}")
self.video_queue.append(bytearray(event.data["buffer"]["data"]))

async def _handle_livestream_audio_data_received(self, event: Event):
pass
#await self.audio_queue.put(bytearray(event.data["buffer"]["data"]))
#pass
self.audio_queue.append(bytearray(event.data["buffer"]["data"]))

async def _initiate_start_stream(self, stream_type) -> bool:
self.set_stream_prodiver(stream_type)
Expand Down Expand Up @@ -135,31 +137,25 @@ async def _initiate_start_stream(self, stream_type) -> bool:

async def check_live_stream(self):
if self.p2p_streamer.retry is not None:
await self.async_restart_livestream(self.p2p_streamer.retry)
_LOGGER.debug(f"async_restart_livestream - start - {self.p2p_streamer.retry}")
if self.stream_status != StreamStatus.IDLE:
await self.stop_livestream()
if self.p2p_streamer.retry is True:
_LOGGER.debug(f"async_restart_livestream - sleep - {self.p2p_streamer.retry}")
await asyncio.sleep(1)
_LOGGER.debug(f"async_restart_livestream - start live stream finish - {self.p2p_streamer.retry}")
await self.start_livestream()
_LOGGER.debug(f"async_restart_livestream - start live stream end - {self.p2p_streamer.retry}")

async def start_livestream(self) -> bool:
"""Process start p2p livestream call"""
self.stream_future = asyncio.create_task(self.p2p_streamer.start())
if await self._initiate_start_stream(StreamProvider.P2P) is False:
return False

self.stream_future = asyncio.create_task(self.p2p_streamer.start())
self.stream_checker = asyncio.create_task(self.check_live_stream())

self.stream_status = StreamStatus.STREAMING
return True

async def async_restart_livestream(self, retry):
_LOGGER.debug(f"async_restart_livestream - start - {retry}")
if self.stream_status != StreamStatus.IDLE:
await self.stop_livestream()
_LOGGER.debug(f"async_restart_livestream - cont - {retry}")
if retry is True:
_LOGGER.debug(f"async_restart_livestream - sleep 5 seconds - {retry}")
await asyncio.sleep(1)
_LOGGER.debug(f"async_restart_livestream - start live stream finish - {retry}")
await self.start_livestream()
_LOGGER.debug(f"async_restart_livestream - start live stream end - {retry}")

async def stop_livestream(self):
"""Process stop p2p livestream call"""
await self.api.stop_livestream(self.product_type, self.serial_no)
Expand Down
60 changes: 37 additions & 23 deletions custom_components/eufy_security/eufy_security_api/p2p_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,60 @@ def __init__(self, camera) -> None:
self.camera = camera
self.retry = None

async def chunk_generator(self, queue):
retry = 0
max_retry = 10
while retry < max_retry:
async def chunk_generator(self, queue, queue_name):
retry_count = 0
max_retry_count = 10
try:
await asyncio.wait_for(self.camera.p2p_started_event.wait(), 5)
except asyncio.TimeoutError as te:
_LOGGER.debug(f"chunk_generator {queue_name} - event did not receive in timeout")
raise te

while retry_count < max_retry_count:
try:
item = queue.get_nowait()
_LOGGER.debug(f"chunk_generator yield data {retry} - {len(item)}")
retry = 0
item = queue.popleft()
_LOGGER.debug(f"chunk_generator {queue_name} yield data {retry_count} - {len(item)}")
retry_count = 0
yield item
except TimeoutError as te:
_LOGGER.debug(f"chunk_generator timeout Exception %s - traceback: %s", te, traceback.format_exc())
raise te
except asyncio.QueueEmpty as qe:
retry = retry + 1
except IndexError as qe:
retry_count = retry_count + 1
await asyncio.sleep(0.1)

async def write_bytes(self, queue):
async def write_bytes(self, queue, queue_name):
url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT)
url = f"{url}?dst={str(self.camera.serial_no)}"

self.retry = False
self.retry = None
try:
async with aiohttp.ClientSession() as session:
resp = await session.post(url, data = self.chunk_generator(queue), timeout=aiohttp.ClientTimeout(total=None, connect=5))
_LOGGER.debug(f"write_bytes - post response - {resp.status} - {await resp.text()}")
_LOGGER.debug("write_bytes - post ended - retry")
resp = await session.post(url, data = self.chunk_generator(queue, queue_name), timeout=aiohttp.ClientTimeout(total=None, connect=5))
_LOGGER.debug(f"write_bytes {queue_name} - post response - {resp.status} - {await resp.text()}")
_LOGGER.debug("write_bytes - post ended - no retry")
self.retry = False
except (asyncio.exceptions.TimeoutError, asyncio.exceptions.CancelledError) as ex:
# live stream probabaly stopped, handle peacefully
_LOGGER.debug(f"write_bytes timeout/cancelled exception %s - traceback: %s", ex, traceback.format_exc())
_LOGGER.debug(f"write_bytes {queue_name} timeout/cancelled no retry exception {ex} - traceback: {traceback.format_exc()}")
self.retry = False
except aiohttp.client_exceptions.ServerDisconnectedError as ex:
# connection to go2rtc server is broken, try again
_LOGGER.debug(f"write_bytes server_disconnected exception %s - traceback: %s", ex, traceback.format_exc())
# connection to go2rtc server is broken, try again``
_LOGGER.debug(f"write_bytes {queue_name} server_disconnected retry exception {ex} - traceback: {traceback.format_exc()}")
self.retry = True
except Exception as ex: # pylint: disable=broad-except
# other exceptions, log the error
_LOGGER.debug(f"write_bytes general exception %s - traceback: %s", ex, traceback.format_exc())
_LOGGER.debug(f"write_bytes {queue_name} general exception no retry {ex} - traceback: {traceback.format_exc()}")
self.retry = False

_LOGGER.debug("write_bytes - ended")
_LOGGER.debug("write_bytes {queue_name} - ended")

async def create_stream_on_go2rtc(self):
parameters = {"name": str(self.camera.serial_no)}
url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT)
url = f"{url}s"
async with aiohttp.ClientSession() as session:
async with session.delete(url, params=parameters) as response:
result = response.status, await response.text()
_LOGGER.debug(f"create_stream_on_go2rtc - delete stream response {result}")

parameters = {"name": str(self.camera.serial_no), "src": str(self.camera.serial_no)}
url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT)
url = f"{url}s"
Expand All @@ -76,4 +87,7 @@ async def start(self):
"""start streaming thread"""
# send API command to go2rtc to create a new stream
await self.create_stream_on_go2rtc()
await self.write_bytes(self.camera.video_queue)
await asyncio.gather(
self.write_bytes(self.camera.audio_queue, "audio"),
self.write_bytes(self.camera.video_queue, "video")
)
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
async def connect(self):
"""Set up web socket connection"""
try:
self.socket = await self.session.ws_connect(f"ws://{self.host}:{self.port}", autoclose=False, autoping=True, heartbeat=60)
self.socket = await self.session.ws_connect(f"ws://{self.host}:{self.port}", heartbeat=10, compress=9)
except Exception as exc:
raise WebSocketConnectionException("Connection to add-on was broken. please reload the integration!") from exc
self.task = self.loop.create_task(self._process_messages())
Expand Down
4 changes: 2 additions & 2 deletions custom_components/eufy_security/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def native_value(self):
"""Return the value reported by the sensor."""
if self.metadata.name in CameraSensor.__members__:
if self.metadata.name == CameraSensor.video_queue_size.name:
return self.product.video_queue.qsize()
return len(self.product.video_queue)
if self.metadata.name == CameraSensor.audio_queue_size.name:
return self.product.audio_queue.qsize()
return len(self.product.audio_queue)
if self.metadata.name == CameraSensor.stream_provider.name:
return self.product.stream_provider.name
return get_child_value(self.product.__dict__, self.metadata.name)
Expand Down

0 comments on commit af7eacf

Please sign in to comment.