From 217007a7b28fe1370b8b844adbcbc55adb2d324e Mon Sep 17 00:00:00 2001 From: Brett Jia Date: Wed, 11 Jan 2023 00:20:15 -0500 Subject: [PATCH] beta: arlo talkback (basestation only) (#475) * initial research * experimental * sdp seems to work, stopIntercom is broken * converting to separate rtc device * standalone rtc speaker implemented but not working * minor cleanup * webrtc troubleshooting and improvements * wait for sdp response before sending candidates * logging changes, rtc troubleshooting * use a future * restore aiortc and run RTC in a background thread * documentation * documentation * typo * comments + simplify background coroutine calls * simplify future * only enable intercom for basestation cameras * bump 0.4.0 * backward compatible headers * bump 0.4.1 * pin cryptography to latest without rust * monkey patch binary deps and pin cryptography for armv7l * hacks to use ffmpeg but fails due to dependency on pylibsrtp * revert back to 0.4.2 * 0.4.3 fix for M1 Macs * use pre-built armv7l wheels * publish 0.4.4 * use custom pypi index for armv7l wheels --- plugins/arlo/package-lock.json | 6 +- plugins/arlo/package.json | 2 +- .../arlo/src/arlo_plugin/arlo/arlo_async.py | 140 ++++++++++++++++-- plugins/arlo/src/arlo_plugin/arlo/logging.py | 4 +- .../src/arlo_plugin/arlo/mqtt_stream_async.py | 2 +- plugins/arlo/src/arlo_plugin/arlo/request.py | 4 +- plugins/arlo/src/arlo_plugin/camera.py | 127 +++++++++++++++- plugins/arlo/src/arlo_plugin/doorbell.py | 7 +- plugins/arlo/src/arlo_plugin/logging.py | 5 +- plugins/arlo/src/arlo_plugin/provider.py | 71 +++++---- .../arlo/src/arlo_plugin/rtcpeerconnection.py | 124 ++++++++++++++++ plugins/arlo/src/arlo_plugin/util.py | 22 +++ plugins/arlo/src/requirements.txt | 6 +- 13 files changed, 466 insertions(+), 54 deletions(-) create mode 100644 plugins/arlo/src/arlo_plugin/rtcpeerconnection.py create mode 100644 plugins/arlo/src/arlo_plugin/util.py diff --git a/plugins/arlo/package-lock.json b/plugins/arlo/package-lock.json index b9b3b166d7..3f9168fa54 100644 --- a/plugins/arlo/package-lock.json +++ b/plugins/arlo/package-lock.json @@ -1,19 +1,19 @@ { "name": "@scrypted/arlo", - "version": "0.3.7", + "version": "0.4.5", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/arlo", - "version": "0.3.7", + "version": "0.4.5", "devDependencies": { "@scrypted/sdk": "file:../../sdk" } }, "../../sdk": { "name": "@scrypted/sdk", - "version": "0.2.27", + "version": "0.2.28", "dev": true, "license": "ISC", "dependencies": { diff --git a/plugins/arlo/package.json b/plugins/arlo/package.json index 93413c83bd..788867fff6 100644 --- a/plugins/arlo/package.json +++ b/plugins/arlo/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/arlo", - "version": "0.3.7", + "version": "0.4.5", "description": "Arlo Plugin for Scrypted", "keywords": [ "scrypted", diff --git a/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py b/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py index 054a6c75a2..b179762d63 100644 --- a/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py +++ b/plugins/arlo/src/arlo_plugin/arlo/arlo_async.py @@ -105,6 +105,8 @@ def float2hex(f): def UseExistingAuth(self, user_id, headers): self.user_id = user_id + if "Content-Type" not in headers: + headers['Content-Type'] = 'application/json; charset=UTF-8' self.request.session.headers.update(headers) self.BASE_URL = 'myapi.arlo.com' @@ -178,6 +180,7 @@ def complete_auth(code): 'Auth-Version': '2', 'Authorization': finish_auth_body['data']['token'], 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_1_2 like Mac OS X) AppleWebKit/604.3.5 (KHTML, like Gecko) Mobile/15B202 NETGEAR/v1 (iOS Vuezone)', + 'Content-Type': 'application/json; charset=UTF-8', } self.request.session.headers.update(headers) self.BASE_URL = 'myapi.arlo.com' @@ -336,6 +339,8 @@ def callback(self, event) This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents() that has a big switch statement in it to handle all the various events Arlo produces. + + Returns the Task object that contains the subscription loop. """ resource = f"cameras/{camera.get('deviceId')}" @@ -348,7 +353,7 @@ def callbackwrapper(self, event): return None return stop - asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) + return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) def SubscribeToBatteryEvents(self, basestation, camera, callback): """ @@ -359,6 +364,8 @@ def callback(self, event) This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents() that has a big switch statement in it to handle all the various events Arlo produces. + + Returns the Task object that contains the subscription loop. """ resource = f"cameras/{camera.get('deviceId')}" @@ -371,7 +378,7 @@ def callbackwrapper(self, event): return None return stop - asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) + return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) def SubscribeToDoorbellEvents(self, basestation, doorbell, callback): """ @@ -382,6 +389,8 @@ def callback(self, event) This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents() that has a big switch statement in it to handle all the various events Arlo produces. + + Returns the Task object that contains the subscription loop. """ resource = f"doorbells/{doorbell.get('deviceId')}" @@ -403,7 +412,59 @@ def callbackwrapper(self, event): return None return stop - asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) + return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper)) + + def SubscribeToSDPAnswers(self, basestation, camera, callback): + """ + Use this method to subscribe to pushToTalk SDP answer events. You must provide a callback function which will get called once per SDP event. + + The callback function should have the following signature: + def callback(self, event) + + This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents() + that has a big switch statement in it to handle all the various events Arlo produces. + + Returns the Task object that contains the subscription loop. + """ + + resource = f"cameras/{camera.get('deviceId')}" + + def callbackwrapper(self, event): + properties = event.get("properties", {}) + stop = None + if properties.get("type") == "answerSdp": + stop = callback(properties.get("data")) + if not stop: + return None + return stop + + return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)) + + def SubscribeToCandidateAnswers(self, basestation, camera, callback): + """ + Use this method to subscribe to pushToTalk ICE candidate answer events. You must provide a callback function which will get called once per candidate event. + + The callback function should have the following signature: + def callback(self, event) + + This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents() + that has a big switch statement in it to handle all the various events Arlo produces. + + Returns the Task object that contains the subscription loop. + """ + + resource = f"cameras/{camera.get('deviceId')}" + + def callbackwrapper(self, event): + properties = event.get("properties", {}) + stop = None + if properties.get("type") == "answerCandidate": + stop = callback(properties.get("data")) + if not stop: + return None + return stop + + return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper)) async def HandleEvents(self, basestation, resource, actions, callback): """ @@ -443,13 +504,14 @@ async def TriggerAndHandleEvent(self, basestation, resource, actions, trigger, c This function will allow you to potentially write a callback that can handle all of the events received from the event stream. NOTE: Use this function if you need to run some code after subscribing to the eventstream, but before your callback to handle the events runs. """ - if not callable(trigger): + if trigger is not None and not callable(trigger): raise Exception('The trigger(self, camera) should be a callable function.') if not callable(callback): raise Exception('The callback(self, event) should be a callable function.') await self.Subscribe() - trigger(self) + if trigger: + trigger(self) # NOTE: Calling HandleEvents() calls Subscribe() again, which basically turns into a no-op. Hackie I know, but it cleans up the code a bit. return await self.HandleEvents(basestation, resource, actions, callback) @@ -479,11 +541,57 @@ async def StartStream(self, basestation, camera): It can be streamed with: ffmpeg -re -i 'rtsps://' -acodec copy -vcodec copy test.mp4 The request to /users/devices/startStream returns: { url:rtsp://:443/vzmodulelive?egressToken=b&userAgent=iOS&cameraId=} """ - stream_url_dict = self.request.post(f'https://{self.BASE_URL}/hmsweb/users/devices/startStream', {"to":camera.get('parentId'),"from":self.user_id+"_web","resource":"cameras/"+camera.get('deviceId'),"action":"set","responseUrl":"", "publishResponse":True,"transId":self.genTransId(),"properties":{"activityState":"startUserStream","cameraId":camera.get('deviceId')}}, headers={"xcloudId":camera.get('xCloudId')}) + stream_url_dict = self.request.post( + f'https://{self.BASE_URL}/hmsweb/users/devices/startStream', + { + "to": camera.get('parentId'), + "from": self.user_id + "_web", + "resource": "cameras/" + camera.get('deviceId'), + "action": "set", + "responseUrl": "", + "publishResponse": True, + "transId": self.genTransId(), + "properties": { + "activityState": "startUserStream", + "cameraId": camera.get('deviceId') + } + }, + headers={"xcloudId":camera.get('xCloudId')} + ) return stream_url_dict['url'].replace("rtsp://", "rtsps://") - def StopStream(self, basestation, camera): - return self.request.post(f'https://{self.BASE_URL}/hmsweb/users/devices/stopStream', {"to":camera.get('parentId'),"from":self.user_id+"_web","resource":"cameras/"+camera.get('deviceId'),"action":"set","responseUrl":"", "publishResponse":True,"transId":self.genTransId(),"properties":{"activityState":"stopUserStream","cameraId":camera.get('deviceId')}}, headers={"xcloudId": camera.get('xCloudId')}) + def StartPushToTalk(self, basestation, camera): + url = f'https://{self.BASE_URL}/hmsweb/users/devices/{self.user_id}_{camera.get("deviceId")}/pushtotalk' + resp = self.request.get(url) + return resp.get("uSessionId"), resp.get("data") + + def NotifyPushToTalkSDP(self, basestation, camera, uSessionId, localSdp): + resource = f"cameras/{camera.get('deviceId')}" + + self.Notify(basestation, { + "action": "pushToTalk", + "resource": resource, + "publishResponse": True, + "properties": { + "data": localSdp, + "type": "offerSdp", + "uSessionId": uSessionId + } + }) + + def NotifyPushToTalkCandidate(self, basestation, camera, uSessionId, localCandidate): + resource = f"cameras/{camera.get('deviceId')}" + + self.Notify(basestation, { + "action": "pushToTalk", + "resource": resource, + "publishResponse": False, + "properties": { + "data": localCandidate, + "type": "offerCandidate", + "uSessionId": uSessionId + } + }) async def TriggerFullFrameSnapshot(self, basestation, camera): """ @@ -494,7 +602,21 @@ async def TriggerFullFrameSnapshot(self, basestation, camera): resource = f"cameras/{camera.get('deviceId')}" def trigger(self): - self.request.post(f"https://{self.BASE_URL}/hmsweb/users/devices/fullFrameSnapshot", {"to":camera.get("parentId"),"from":self.user_id+"_web","resource":"cameras/"+camera.get("deviceId"),"action":"set","publishResponse":True,"transId":self.genTransId(),"properties":{"activityState":"fullFrameSnapshot"}}, headers={"xcloudId":camera.get("xCloudId")}) + self.request.post( + f"https://{self.BASE_URL}/hmsweb/users/devices/fullFrameSnapshot", + { + "to": camera.get("parentId"), + "from": self.user_id + "_web", + "resource": "cameras/" + camera.get("deviceId"), + "action": "set", + "publishResponse": True, + "transId": self.genTransId(), + "properties": { + "activityState": "fullFrameSnapshot" + } + }, + headers={"xcloudId":camera.get("xCloudId")} + ) def callback(self, event): properties = event.get("properties", {}) diff --git a/plugins/arlo/src/arlo_plugin/arlo/logging.py b/plugins/arlo/src/arlo_plugin/arlo/logging.py index 9c56c4c0a2..d59ded4101 100644 --- a/plugins/arlo/src/arlo_plugin/arlo/logging.py +++ b/plugins/arlo/src/arlo_plugin/arlo/logging.py @@ -2,14 +2,14 @@ import sys # construct logger instance to be used by package arlo -logger = logging.getLogger("arlo") +logger = logging.getLogger("lib") logger.setLevel(logging.INFO) # output logger to stdout ch = logging.StreamHandler(sys.stdout) # log formatting -fmt = logging.Formatter("%(asctime)s %(name)s %(levelname)s: %(message)s") +fmt = logging.Formatter("(arlo) %(levelname)s:%(name)s:%(asctime)s.%(msecs)03d %(message)s", "%H:%M:%S") ch.setFormatter(fmt) # configure handler to logger diff --git a/plugins/arlo/src/arlo_plugin/arlo/mqtt_stream_async.py b/plugins/arlo/src/arlo_plugin/arlo/mqtt_stream_async.py index f2ad67ad4d..9549ae3131 100644 --- a/plugins/arlo/src/arlo_plugin/arlo/mqtt_stream_async.py +++ b/plugins/arlo/src/arlo_plugin/arlo/mqtt_stream_async.py @@ -39,7 +39,7 @@ def on_message(client, userdata, msg): self.event_stream = mqtt.Client(client_id=f"user_{self.arlo.user_id}_{self._gen_client_number()}", transport="websockets", clean_session=False) self.event_stream.username_pw_set(self.arlo.user_id, password=self.arlo.request.session.headers.get('Authorization')) self.event_stream.ws_set_options(path="/mqtt", headers={"Origin": "https://my.arlo.com"}) - self.event_stream.enable_logger(logger=logger) + #self.event_stream.enable_logger(logger=logger) self.event_stream.on_connect = on_connect self.event_stream.on_message = on_message self.event_stream.tls_set() diff --git a/plugins/arlo/src/arlo_plugin/arlo/request.py b/plugins/arlo/src/arlo_plugin/arlo/request.py index 4d9d2b6076..a340d2888c 100644 --- a/plugins/arlo/src/arlo_plugin/arlo/request.py +++ b/plugins/arlo/src/arlo_plugin/arlo/request.py @@ -44,12 +44,12 @@ def _request(self, url, method='GET', params={}, headers={}, stream=False, raw=F import logging import http.client http.client.HTTPConnection.debuglevel = 1 - logging.basicConfig() + #logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) req_log = logging.getLogger('requests.packages.urllib3') req_log.setLevel(logging.DEBUG) req_log.propagate = True - """ + #""" url = f'{url}?eventId={self.gen_event_id()}&time={self.get_time()}' diff --git a/plugins/arlo/src/arlo_plugin/camera.py b/plugins/arlo/src/arlo_plugin/camera.py index fa476a8bf9..6988f029c3 100644 --- a/plugins/arlo/src/arlo_plugin/camera.py +++ b/plugins/arlo/src/arlo_plugin/camera.py @@ -1,12 +1,19 @@ +from aioice import Candidate +from aiortc import RTCSessionDescription, RTCIceGatherer, RTCIceServer +from aiortc.rtcicetransport import candidate_to_aioice, candidate_from_aioice import asyncio +import json import scrypted_sdk from scrypted_sdk import ScryptedDeviceBase -from scrypted_sdk.types import Camera, VideoCamera, MotionSensor, Battery, ScryptedMimeTypes +from scrypted_sdk.types import Camera, VideoCamera, Intercom, MotionSensor, Battery, ScryptedMimeTypes from .logging import ScryptedDeviceLoggerMixin +from .util import BackgroundTaskMixin +from .rtcpeerconnection import BackgroundRTCPeerConnection -class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, MotionSensor, Battery, ScryptedDeviceLoggerMixin): + +class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, Intercom, MotionSensor, Battery, ScryptedDeviceLoggerMixin, BackgroundTaskMixin): timeout = 30 nativeId = None arlo_device = None @@ -16,8 +23,7 @@ class ArloCamera(ScryptedDeviceBase, Camera, VideoCamera, MotionSensor, Battery, def __init__(self, nativeId, arlo_device, arlo_basestation, provider): super().__init__(nativeId=nativeId) - this_class = type(self) - self.logger_name = f"{this_class.__name__}[{nativeId}]" + self.logger_name = f"{nativeId}.camera" self.nativeId = nativeId self.arlo_device = arlo_device @@ -31,6 +37,11 @@ def __init__(self, nativeId, arlo_device, arlo_basestation, provider): self.start_motion_subscription() self.start_battery_subscription() + self.pc = None + self.sdp_answered = False + self.start_sdp_answer_subscription() + self.start_candidate_answer_subscription() + def __del__(self): self.stop_subscriptions = True @@ -39,14 +50,57 @@ def callback(motionDetected): self.motionDetected = motionDetected return self.stop_subscriptions - self.provider.arlo.SubscribeToMotionEvents(self.arlo_basestation, self.arlo_device, callback) + self.register_task( + self.provider.arlo.SubscribeToMotionEvents(self.arlo_basestation, self.arlo_device, callback) + ) def start_battery_subscription(self): def callback(batteryLevel): self.batteryLevel = batteryLevel return self.stop_subscriptions - self.provider.arlo.SubscribeToBatteryEvents(self.arlo_basestation, self.arlo_device, callback) + self.register_task( + self.provider.arlo.SubscribeToBatteryEvents(self.arlo_basestation, self.arlo_device, callback) + ) + + def start_sdp_answer_subscription(self): + def callback(sdp): + if self.pc and not self.sdp_answered: + if "a=mid:" not in sdp: + # arlo appears to not return a mux id in the response, which + # doesn't play nicely with our webrtc peers. let's add it + sdp += "a=mid:0\r\n" + self.logger.info(f"Arlo response sdp:\n{sdp}") + + sdp = RTCSessionDescription(sdp=sdp, type="answer") + self.create_task(self.pc.setRemoteDescription(sdp)) + self.sdp_answered = True + return self.stop_subscriptions + + self.register_task( + self.provider.arlo.SubscribeToSDPAnswers(self.arlo_basestation, self.arlo_device, callback) + ) + + def start_candidate_answer_subscription(self): + def callback(candidate): + if self.pc: + prefix = "a=candidate:" + if candidate.startswith(prefix): + candidate = candidate[len(prefix):] + candidate = candidate.strip() + self.logger.info(f"Arlo response candidate: {candidate}") + + candidate = candidate_from_aioice(Candidate.from_sdp(candidate)) + if candidate.sdpMid is None: + # arlo appears to not return a mux id in the response, which + # doesn't play nicely with aiortc. let's add it + candidate.sdpMid = 0 + self.create_task(self.pc.addIceCandidate(candidate)) + return self.stop_subscriptions + + self.register_task( + self.provider.arlo.SubscribeToCandidateAnswers(self.arlo_basestation, self.arlo_device, callback) + ) async def getPictureOptions(self): return [] @@ -54,7 +108,7 @@ async def getPictureOptions(self): async def takePicture(self, options=None): self.logger.info("Taking picture") - real_device = await scrypted_sdk.systemManager.api.getDeviceById(self.deviceState._id) + real_device = await scrypted_sdk.systemManager.api.getDeviceById(self.getScryptedProperty("id")) msos = await real_device.getVideoStreamOptions() if any(["prebuffer" in m for m in msos]): self.logger.info("Getting snapshot from prebuffer") @@ -94,6 +148,65 @@ async def getVideoStream(self, options=None): return await scrypted_sdk.mediaManager.createMediaObject(str.encode(rtsp_url), ScryptedMimeTypes.Url.value) + async def startIntercom(self, media): + self.logger.info("Starting intercom") + + ffmpeg_params = json.loads(await scrypted_sdk.mediaManager.convertMediaObjectToBuffer(media, ScryptedMimeTypes.FFmpegInput.value)) + self.logger.debug(f"Received ffmpeg params: {ffmpeg_params}") + + session_id, ice_servers = self.provider.arlo.StartPushToTalk(self.arlo_basestation, self.arlo_device) + self.logger.debug(f"Received ice servers: {[ice['url'] for ice in ice_servers]}") + + ice_servers = [ + RTCIceServer(urls=ice["url"], credential=ice.get("credential"), username=ice.get("username")) + for ice in ice_servers + ] + ice_gatherer = RTCIceGatherer(ice_servers) + await ice_gatherer.gather() + + local_candidates = [ + f"candidate:{Candidate.to_sdp(candidate_to_aioice(candidate))}" + for candidate in ice_gatherer.getLocalCandidates() + ] + + log_candidates = '\n'.join(local_candidates) + self.logger.info(f"Local candidates:\n{log_candidates}") + + # MediaPlayer/PyAV will block until the intercom stream starts, and it seems that scrypted waits + # for startIntercom to exit before sending data. So, let's do the remaining setup in a coroutine + # so this function can return early. + # This is required even if we use BackgroundRTCPeerConnection, since setting up MediaPlayer may + # block the background thread's event loop and prevent other async functions from running. + async def async_setup(): + pc = self.pc = BackgroundRTCPeerConnection() + self.sdp_answered = False + + pc.add_rtsp_audio(ffmpeg_params["url"]) + + offer = await pc.createOffer() + self.logger.info(f"Arlo offer sdp:\n{offer.sdp}") + + await pc.setLocalDescription(offer) + + self.provider.arlo.NotifyPushToTalkSDP( + self.arlo_basestation, self.arlo_device, + session_id, offer.sdp + ) + for candidate in local_candidates: + self.provider.arlo.NotifyPushToTalkCandidate( + self.arlo_basestation, self.arlo_device, + session_id, candidate + ) + + self.create_task(async_setup()) + + async def stopIntercom(self): + self.logger.info("Stopping intercom") + if self.pc: + await self.pc.close() + self.pc = None + self.sdp_answered = False + def _update_device_details(self, arlo_device): """For updating device details from the Arlo dictionary retrieved from Arlo's REST API. """ diff --git a/plugins/arlo/src/arlo_plugin/doorbell.py b/plugins/arlo/src/arlo_plugin/doorbell.py index d6760a0c7a..a0c8a56ba3 100644 --- a/plugins/arlo/src/arlo_plugin/doorbell.py +++ b/plugins/arlo/src/arlo_plugin/doorbell.py @@ -2,10 +2,13 @@ from .camera import ArloCamera + class ArloDoorbell(ArloCamera, BinarySensor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.logger_name = f"{self.nativeId}.doorbell" + self.start_doorbell_subscription() def start_doorbell_subscription(self): @@ -13,4 +16,6 @@ def callback(doorbellPressed): self.binaryState = doorbellPressed return self.stop_subscriptions - self.provider.arlo.SubscribeToDoorbellEvents(self.arlo_basestation, self.arlo_device, callback) \ No newline at end of file + self.register_task( + self.provider.arlo.SubscribeToDoorbellEvents(self.arlo_basestation, self.arlo_device, callback) + ) \ No newline at end of file diff --git a/plugins/arlo/src/arlo_plugin/logging.py b/plugins/arlo/src/arlo_plugin/logging.py index babc34f8c3..76b5aebe32 100644 --- a/plugins/arlo/src/arlo_plugin/logging.py +++ b/plugins/arlo/src/arlo_plugin/logging.py @@ -1,5 +1,6 @@ import logging + class ScryptedDeviceLoggingWrapper(logging.Handler): scrypted_device = None @@ -10,6 +11,7 @@ def __init__(self, scrypted_device): def emit(self, record): self.scrypted_device.print(self.format(record)) + def createScryptedLogger(scrypted_device, name): logger = logging.getLogger(name) if logger.hasHandlers(): @@ -21,7 +23,7 @@ def createScryptedLogger(scrypted_device, name): sh = ScryptedDeviceLoggingWrapper(scrypted_device) # log formatting - fmt = logging.Formatter("%(asctime)s %(name)s %(levelname)s: %(message)s") + fmt = logging.Formatter("(arlo) %(levelname)s:%(name)s:%(asctime)s.%(msecs)03d %(message)s", "%H:%M:%S") sh.setFormatter(fmt) # configure handler to logger @@ -29,6 +31,7 @@ def createScryptedLogger(scrypted_device, name): return logger + class ScryptedDeviceLoggerMixin: _logger = None logger_name = None diff --git a/plugins/arlo/src/arlo_plugin/provider.py b/plugins/arlo/src/arlo_plugin/provider.py index 85a84aa5db..a765bdb55e 100644 --- a/plugins/arlo/src/arlo_plugin/provider.py +++ b/plugins/arlo/src/arlo_plugin/provider.py @@ -13,8 +13,11 @@ from .camera import ArloCamera from .doorbell import ArloDoorbell from .logging import ScryptedDeviceLoggerMixin +from .util import BackgroundTaskMixin +from .rtcpeerconnection import logger as background_rtc_logger -class ArloProvider(ScryptedDeviceBase, Settings, DeviceProvider, DeviceDiscovery, ScryptedDeviceLoggerMixin): + +class ArloProvider(ScryptedDeviceBase, Settings, DeviceProvider, DeviceDiscovery, ScryptedDeviceLoggerMixin, BackgroundTaskMixin): arlo_cameras = None arlo_basestations = None _arlo_mfa_code = None @@ -32,7 +35,7 @@ class ArloProvider(ScryptedDeviceBase, Settings, DeviceProvider, DeviceDiscovery def __init__(self, nativeId=None): super().__init__(nativeId=nativeId) - self.logger_name = "ArloProvider" + self.logger_name = "provider" self.arlo_cameras = {} self.arlo_basestations = {} @@ -93,7 +96,7 @@ def arlo(self): self.storage.setItem("arlo_auth_headers", json.dumps(dict(self._arlo.request.session.headers.items()))) self.storage.setItem("arlo_user_id", self._arlo.user_id) - asyncio.get_event_loop().create_task(self.do_arlo_setup()) + self.create_task(self.do_arlo_setup()) return self._arlo @@ -108,7 +111,7 @@ def arlo(self): self._arlo.UseExistingAuth(self.arlo_user_id, json.loads(headers)) self.logger.info(f"Initialized Arlo client, reusing stored auth headers") - asyncio.get_event_loop().create_task(self.do_arlo_setup()) + self.create_task(self.do_arlo_setup()) return self._arlo else: self._arlo_mfa_complete_auth = self._arlo.LoginMFA() @@ -138,6 +141,15 @@ async def do_arlo_setup(self): except Exception as e: self.logger.error(f"Error performing post-login Arlo setup: {type(e)} with message {str(e)}") + def invalidate_arlo_client(self): + if self.arlo is not None: + self._arlo.Unsubscribe() + self._arlo = None + self._arlo_mfa_code = None + self._arlo_mfa_complete_auth = None + self.storage.setItem("arlo_auth_headers", "") + self.storage.setItem("arlo_user_id", "") + def get_current_log_level(self): return ArloProvider.plugin_verbosity_choices[self.plugin_verbosity] @@ -148,6 +160,7 @@ def propagate_verbosity(self): for _, device in self.scrypted_devices.items(): device.logger.setLevel(log_level) arlo_lib_logger.setLevel(log_level) + background_rtc_logger.setLevel(log_level) def propagate_transport(self): self.print(f"Setting plugin transport to {self.arlo_transport}") @@ -171,6 +184,13 @@ async def getSettings(self): "title": "Two Factor Code", "description": "Enter the code sent by Arlo to your email or phone number.", }, + { + "key": "force_reauth", + "title": "Force Re-Authentication", + "description": "Resets the authentication flow of the plugin. Will also re-do 2FA.", + "value": "No", + "choices": ["No", "Yes"], + }, { "key": "arlo_transport", "title": "Underlying Transport Protocol", @@ -190,6 +210,9 @@ async def getSettings(self): async def putSetting(self, key, value): if key == "arlo_mfa_code": self._arlo_mfa_code = value + elif key == "force_reauth": + # force arlo client to be invalidated and reloaded + self.invalidate_arlo_client() else: self.storage.setItem(key, value) @@ -204,13 +227,7 @@ async def putSetting(self, key, value): self._arlo = None else: # force arlo client to be invalidated and reloaded - if self.arlo is not None: - self._arlo.Unsubscribe() - self._arlo = None - self._arlo_mfa_code = None - self._arlo_mfa_complete_auth = None - self.storage.setItem("arlo_auth_headers", "") - self.storage.setItem("arlo_user_id", "") + self.invalidate_arlo_client() # initialize Arlo client or continue MFA _ = self.arlo @@ -246,14 +263,11 @@ async def discoverDevices(self, duration=0): }, "nativeId": camera["deviceId"], "name": camera["deviceName"], - "interfaces": self.get_interfaces_by_model(camera['properties']['modelId']), + "interfaces": self.get_interfaces(camera), "type": ScryptedDeviceType.Camera.value, "providerNativeId": self.nativeId, } - if camera['deviceType'] == 'doorbell': - device["interfaces"].append(ScryptedInterface.BinarySensor.value) - devices.append(device) if camera["deviceId"] == camera["parentId"]: @@ -293,20 +307,25 @@ def createCamera(self, nativeId): else: return ArloCamera(nativeId, arlo_camera, arlo_basestation, self) - def get_interfaces_by_model(self, model_id): - model_id = model_id.lower() + def get_interfaces(self, camera): + model_id = camera['properties']['modelId'].lower() self.logger.debug(f"Checking applicable scrypted interfaces for {model_id}") - if model_id.startswith("avd1001"): - return [ - ScryptedInterface.VideoCamera.value, - ScryptedInterface.Camera.value, - ScryptedInterface.MotionSensor.value, - ] - - return [ + results = [ ScryptedInterface.VideoCamera.value, ScryptedInterface.Camera.value, ScryptedInterface.MotionSensor.value, + ScryptedInterface.Intercom.value, ScryptedInterface.Battery.value, - ] \ No newline at end of file + ] + + if model_id.startswith("avd1001"): + results.remove(ScryptedInterface.Battery.value) + + if camera['deviceType'] == 'doorbell': + results.append(ScryptedInterface.BinarySensor.value) + + if camera["deviceId"] == camera["parentId"]: + results.remove(ScryptedInterface.Intercom.value) + + return results \ No newline at end of file diff --git a/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py new file mode 100644 index 0000000000..61d93998f7 --- /dev/null +++ b/plugins/arlo/src/arlo_plugin/rtcpeerconnection.py @@ -0,0 +1,124 @@ +from aiortc import RTCPeerConnection +from aiortc.contrib.media import MediaPlayer +import asyncio +import inspect +import threading +import logging +import queue +import sys + + +# construct logger instance to be used by BackgroundRTCPeerConnection +logger = logging.getLogger("rtc") +logger.setLevel(logging.INFO) + +# output logger to stdout +ch = logging.StreamHandler(sys.stdout) + +# log formatting +fmt = logging.Formatter("(arlo) %(levelname)s:%(name)s:%(asctime)s.%(msecs)03d %(message)s", "%H:%M:%S") +ch.setFormatter(fmt) + +# configure handler to logger +logger.addHandler(ch) + + +class BackgroundRTCPeerConnection: + """Proxy class to use RTCPeerConnection in a background thread. + + The purpose of this proxy is to ensure that RTCPeerConnection operations + do not block the main asyncio thread. From testing, it seems that the + close() function blocks until the source RTSP server exits, which we + have no control over. Additionally, since asyncio coroutines are tied + to the event loop they were constructed from, it is not possible to only + run close() in a separate thread. Therefore, each instance of RTCPeerConnection + is launched within its own ephemeral thread, which cleans itself up once + close() completes. + """ + + def __init__(self): + self.main_loop = asyncio.get_event_loop() + self.background_loop = asyncio.new_event_loop() + + self.thread_started = queue.Queue(1) + self.thread = threading.Thread(target=self.__background_main) + self.thread.start() + self.thread_started.get() + + def __background_main(self): + logger.debug(f"Background RTC loop {self.thread.name} starting") + self.pc = RTCPeerConnection() + + asyncio.set_event_loop(self.background_loop) + self.thread_started.put(True) + self.background_loop.run_forever() + + logger.debug(f"Background RTC loop {self.thread.name} exiting") + + async def __run_background(self, coroutine, await_result=True, stop_loop=False): + fut = self.main_loop.create_future() + + def background_callback(): + # callback to run on main_loop. + def to_main(result, is_error): + if is_error: + fut.set_exception(result) + else: + fut.set_result(result) + + # callback to run on background_loop., after the coroutine completes + def callback(task): + is_error = False + if task.exception(): + result = task.exception() + is_error = True + else: + result = task.result() + + # send results to the main loop + self.main_loop.call_soon_threadsafe(to_main, result, is_error) + + # stopping the loop here ensures that the coroutine completed + # and doesn't raise any "task not awaited" exceptions + if stop_loop: + self.background_loop.stop() + + task = self.background_loop.create_task(coroutine) + task.add_done_callback(callback) + + # start the callback in the background loop + self.background_loop.call_soon_threadsafe(background_callback) + + if not await_result: + return None + return await fut + + async def createOffer(self): + return await self.__run_background(self.pc.createOffer()) + + async def setLocalDescription(self, sdp): + return await self.__run_background(self.pc.setLocalDescription(sdp)) + + async def setRemoteDescription(self, sdp): + return await self.__run_background(self.pc.setRemoteDescription(sdp)) + + async def addIceCandidate(self, candidate): + return await self.__run_background(self.pc.addIceCandidate(candidate)) + + async def close(self): + await self.__run_background(self.pc.close(), await_result=False, stop_loop=True) + + def add_rtsp_audio(self, rtsp_url): + """Adds an audio track to the RTCPeerConnection given a source RTSP url. + + This constructs a MediaPlayer in the background thread's asyncio loop, + since MediaPlayer also utilizes coroutines and asyncio. + + Note that this may block the background thread's event loop if the RTSP + server is not yet ready. + """ + def add_rtsp_audio_background(): + media_player = MediaPlayer(rtsp_url, format="rtsp") + self.pc.addTrack(media_player.audio) + + self.background_loop.call_soon_threadsafe(add_rtsp_audio_background) \ No newline at end of file diff --git a/plugins/arlo/src/arlo_plugin/util.py b/plugins/arlo/src/arlo_plugin/util.py new file mode 100644 index 0000000000..1ec24fa588 --- /dev/null +++ b/plugins/arlo/src/arlo_plugin/util.py @@ -0,0 +1,22 @@ +import asyncio + + +class BackgroundTaskMixin: + def create_task(self, coroutine): + task = asyncio.get_event_loop().create_task(coroutine) + self.register_task(task) + return task + + def register_task(self, task): + if not hasattr(self, "background_tasks"): + self.background_tasks = set() + + assert task is not None + + def print_exception(task): + if task.exception(): + self.logger.error(f"task exception: {task.exception()}") + + self.background_tasks.add(task) + task.add_done_callback(print_exception) + task.add_done_callback(self.background_tasks.discard) \ No newline at end of file diff --git a/plugins/arlo/src/requirements.txt b/plugins/arlo/src/requirements.txt index 6a87104a00..007228234b 100644 --- a/plugins/arlo/src/requirements.txt +++ b/plugins/arlo/src/requirements.txt @@ -1,3 +1,7 @@ paho-mqtt==1.6.1 sseclient==0.0.22 -requests \ No newline at end of file +requests +aiortc==1.3.2 +cryptography<39.0.0 +--extra-index-url=https://bjia56.github.io/scrypted-python-wheels/ +--no-binary=cffi \ No newline at end of file