Skip to content

Commit

Permalink
feat: add more reliability to p2p?
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatakgun committed Nov 24, 2023
1 parent 3cfa3bd commit 47ac0d5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
4 changes: 2 additions & 2 deletions custom_components/eufy_security/eufy_security_api/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ async def _handle_livestream_video_data_received(self, event: Event):
await self.video_queue.put(event.data["buffer"]["data"])

async def _handle_livestream_audio_data_received(self, event: Event):
pass
#await self.audio_queue.put(event.data["buffer"]["data"])
#pass
await self.audio_queue.put(event.data["buffer"]["data"])

async def _initiate_start_stream(self, stream_type) -> bool:
self.set_stream_prodiver(stream_type)
Expand Down
23 changes: 19 additions & 4 deletions custom_components/eufy_security/eufy_security_api/p2p_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import logging
import socket
import threading
import json
from time import sleep
import traceback
Expand All @@ -20,14 +21,20 @@ def __init__(self, camera) -> None:
self.camera = camera

async def chunk_generator(self, queue):
while True:
retry = 0
max_retry = 10
while retry < max_retry:
try:
item = await asyncio.wait_for(queue.get(), timeout=2.5)
item = queue.get_nowait()
retry = 0
_LOGGER.debug(f"chunk_generator yield data - {len(item)}")
yield bytearray(item)
except TimeoutError as te:
_LOGGER.debug(f"chunk_generator timeout Exception %s - traceback: %s", te, traceback.format_exc())
raise te
except asyncio.QueueEmpty as qe:
retry = retry + 1
await asyncio.sleep(0.1)

async def write_bytes(self, queue):
url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT)
Expand Down Expand Up @@ -63,12 +70,19 @@ async def create_stream_on_go2rtc(self):
result = response.status, await response.text()
_LOGGER.debug(f"create_stream_on_go2rtc - put stream response {result}")

def p2p_worker(self):
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop.run_until_complete(self.write_bytes(self.camera.video_queue))
return

async def start(self):
"""start streaming thread"""
# send API command to go2rtc to create a new stream
await self.create_stream_on_go2rtc()
asyncio.get_event_loop().create_task(self.write_bytes(self.camera.video_queue))
# asyncio.get_event_loop().create_task(self.write_bytes(self.camera.audio_queue))
p2p_thread = threading.Thread(target=self.p2p_worker, daemon=True)
p2p_thread.start()
#asyncio.new_event_loop().create_task(self.write_bytes(self.camera.audio_queue))


async def stop(self):
Expand All @@ -79,3 +93,4 @@ async def stop(self, retry: boolean):
await asyncio.sleep(5)
if retry is True:
await self.camera.start_livestream()

0 comments on commit 47ac0d5

Please sign in to comment.