Skip to content

Commit

Permalink
beta: arlo talkback (basestation only) (koush#475)
Browse files Browse the repository at this point in the history
* initial research

* experimental

* sdp seems to work, stopIntercom is broken

* converting to separate rtc device

* standalone rtc speaker implemented but not working

* minor cleanup

* webrtc troubleshooting and improvements

* wait for sdp response before sending candidates

* logging changes, rtc troubleshooting

* use a future

* restore aiortc and run RTC in a background thread

* documentation

* documentation

* typo

* comments + simplify background coroutine calls

* simplify future

* only enable intercom for basestation cameras

* bump 0.4.0

* backward compatible headers

* bump 0.4.1

* pin cryptography to latest without rust

* monkey patch binary deps and pin cryptography for armv7l

* hacks to use ffmpeg but fails due to dependency on pylibsrtp

* revert back to 0.4.2

* 0.4.3 fix for M1 Macs

* use pre-built armv7l wheels

* publish 0.4.4

* use custom pypi index for armv7l wheels
  • Loading branch information
bjia56 authored and nberardi committed Feb 24, 2023
1 parent 58fd368 commit 6a2520e
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 54 deletions.
6 changes: 3 additions & 3 deletions plugins/arlo/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/arlo/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/arlo",
"version": "0.3.7",
"version": "0.4.5",
"description": "Arlo Plugin for Scrypted",
"keywords": [
"scrypted",
Expand Down
140 changes: 131 additions & 9 deletions plugins/arlo/src/arlo_plugin/arlo/arlo_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def float2hex(f):

def UseExistingAuth(self, user_id, headers):
self.user_id = user_id
if "Content-Type" not in headers:
headers['Content-Type'] = 'application/json; charset=UTF-8'
self.request.session.headers.update(headers)
self.BASE_URL = 'myapi.arlo.com'

Expand Down Expand Up @@ -178,6 +180,7 @@ def complete_auth(code):
'Auth-Version': '2',
'Authorization': finish_auth_body['data']['token'],
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_1_2 like Mac OS X) AppleWebKit/604.3.5 (KHTML, like Gecko) Mobile/15B202 NETGEAR/v1 (iOS Vuezone)',
'Content-Type': 'application/json; charset=UTF-8',
}
self.request.session.headers.update(headers)
self.BASE_URL = 'myapi.arlo.com'
Expand Down Expand Up @@ -336,6 +339,8 @@ def callback(self, event)
This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents()
that has a big switch statement in it to handle all the various events Arlo produces.
Returns the Task object that contains the subscription loop.
"""
resource = f"cameras/{camera.get('deviceId')}"

Expand All @@ -348,7 +353,7 @@ def callbackwrapper(self, event):
return None
return stop

asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))

def SubscribeToBatteryEvents(self, basestation, camera, callback):
"""
Expand All @@ -359,6 +364,8 @@ def callback(self, event)
This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents()
that has a big switch statement in it to handle all the various events Arlo produces.
Returns the Task object that contains the subscription loop.
"""
resource = f"cameras/{camera.get('deviceId')}"

Expand All @@ -371,7 +378,7 @@ def callbackwrapper(self, event):
return None
return stop

asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))

def SubscribeToDoorbellEvents(self, basestation, doorbell, callback):
"""
Expand All @@ -382,6 +389,8 @@ def callback(self, event)
This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents()
that has a big switch statement in it to handle all the various events Arlo produces.
Returns the Task object that contains the subscription loop.
"""

resource = f"doorbells/{doorbell.get('deviceId')}"
Expand All @@ -403,7 +412,59 @@ def callbackwrapper(self, event):
return None
return stop

asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))
return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['is'], callbackwrapper))

def SubscribeToSDPAnswers(self, basestation, camera, callback):
"""
Use this method to subscribe to pushToTalk SDP answer events. You must provide a callback function which will get called once per SDP event.
The callback function should have the following signature:
def callback(self, event)
This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents()
that has a big switch statement in it to handle all the various events Arlo produces.
Returns the Task object that contains the subscription loop.
"""

resource = f"cameras/{camera.get('deviceId')}"

def callbackwrapper(self, event):
properties = event.get("properties", {})
stop = None
if properties.get("type") == "answerSdp":
stop = callback(properties.get("data"))
if not stop:
return None
return stop

return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper))

def SubscribeToCandidateAnswers(self, basestation, camera, callback):
"""
Use this method to subscribe to pushToTalk ICE candidate answer events. You must provide a callback function which will get called once per candidate event.
The callback function should have the following signature:
def callback(self, event)
This is an example of handling a specific event, in reality, you'd probably want to write a callback for HandleEvents()
that has a big switch statement in it to handle all the various events Arlo produces.
Returns the Task object that contains the subscription loop.
"""

resource = f"cameras/{camera.get('deviceId')}"

def callbackwrapper(self, event):
properties = event.get("properties", {})
stop = None
if properties.get("type") == "answerCandidate":
stop = callback(properties.get("data"))
if not stop:
return None
return stop

return asyncio.get_event_loop().create_task(self.HandleEvents(basestation, resource, ['pushToTalk'], callbackwrapper))

async def HandleEvents(self, basestation, resource, actions, callback):
"""
Expand Down Expand Up @@ -443,13 +504,14 @@ async def TriggerAndHandleEvent(self, basestation, resource, actions, trigger, c
This function will allow you to potentially write a callback that can handle all of the events received from the event stream.
NOTE: Use this function if you need to run some code after subscribing to the eventstream, but before your callback to handle the events runs.
"""
if not callable(trigger):
if trigger is not None and not callable(trigger):
raise Exception('The trigger(self, camera) should be a callable function.')
if not callable(callback):
raise Exception('The callback(self, event) should be a callable function.')

await self.Subscribe()
trigger(self)
if trigger:
trigger(self)

# NOTE: Calling HandleEvents() calls Subscribe() again, which basically turns into a no-op. Hackie I know, but it cleans up the code a bit.
return await self.HandleEvents(basestation, resource, actions, callback)
Expand Down Expand Up @@ -479,11 +541,57 @@ async def StartStream(self, basestation, camera):
It can be streamed with: ffmpeg -re -i 'rtsps://<url>' -acodec copy -vcodec copy test.mp4
The request to /users/devices/startStream returns: { url:rtsp://<url>:443/vzmodulelive?egressToken=b<xx>&userAgent=iOS&cameraId=<camid>}
"""
stream_url_dict = self.request.post(f'https://{self.BASE_URL}/hmsweb/users/devices/startStream', {"to":camera.get('parentId'),"from":self.user_id+"_web","resource":"cameras/"+camera.get('deviceId'),"action":"set","responseUrl":"", "publishResponse":True,"transId":self.genTransId(),"properties":{"activityState":"startUserStream","cameraId":camera.get('deviceId')}}, headers={"xcloudId":camera.get('xCloudId')})
stream_url_dict = self.request.post(
f'https://{self.BASE_URL}/hmsweb/users/devices/startStream',
{
"to": camera.get('parentId'),
"from": self.user_id + "_web",
"resource": "cameras/" + camera.get('deviceId'),
"action": "set",
"responseUrl": "",
"publishResponse": True,
"transId": self.genTransId(),
"properties": {
"activityState": "startUserStream",
"cameraId": camera.get('deviceId')
}
},
headers={"xcloudId":camera.get('xCloudId')}
)
return stream_url_dict['url'].replace("rtsp://", "rtsps://")

def StopStream(self, basestation, camera):
return self.request.post(f'https://{self.BASE_URL}/hmsweb/users/devices/stopStream', {"to":camera.get('parentId'),"from":self.user_id+"_web","resource":"cameras/"+camera.get('deviceId'),"action":"set","responseUrl":"", "publishResponse":True,"transId":self.genTransId(),"properties":{"activityState":"stopUserStream","cameraId":camera.get('deviceId')}}, headers={"xcloudId": camera.get('xCloudId')})
def StartPushToTalk(self, basestation, camera):
url = f'https://{self.BASE_URL}/hmsweb/users/devices/{self.user_id}_{camera.get("deviceId")}/pushtotalk'
resp = self.request.get(url)
return resp.get("uSessionId"), resp.get("data")

def NotifyPushToTalkSDP(self, basestation, camera, uSessionId, localSdp):
resource = f"cameras/{camera.get('deviceId')}"

self.Notify(basestation, {
"action": "pushToTalk",
"resource": resource,
"publishResponse": True,
"properties": {
"data": localSdp,
"type": "offerSdp",
"uSessionId": uSessionId
}
})

def NotifyPushToTalkCandidate(self, basestation, camera, uSessionId, localCandidate):
resource = f"cameras/{camera.get('deviceId')}"

self.Notify(basestation, {
"action": "pushToTalk",
"resource": resource,
"publishResponse": False,
"properties": {
"data": localCandidate,
"type": "offerCandidate",
"uSessionId": uSessionId
}
})

async def TriggerFullFrameSnapshot(self, basestation, camera):
"""
Expand All @@ -494,7 +602,21 @@ async def TriggerFullFrameSnapshot(self, basestation, camera):
resource = f"cameras/{camera.get('deviceId')}"

def trigger(self):
self.request.post(f"https://{self.BASE_URL}/hmsweb/users/devices/fullFrameSnapshot", {"to":camera.get("parentId"),"from":self.user_id+"_web","resource":"cameras/"+camera.get("deviceId"),"action":"set","publishResponse":True,"transId":self.genTransId(),"properties":{"activityState":"fullFrameSnapshot"}}, headers={"xcloudId":camera.get("xCloudId")})
self.request.post(
f"https://{self.BASE_URL}/hmsweb/users/devices/fullFrameSnapshot",
{
"to": camera.get("parentId"),
"from": self.user_id + "_web",
"resource": "cameras/" + camera.get("deviceId"),
"action": "set",
"publishResponse": True,
"transId": self.genTransId(),
"properties": {
"activityState": "fullFrameSnapshot"
}
},
headers={"xcloudId":camera.get("xCloudId")}
)

def callback(self, event):
properties = event.get("properties", {})
Expand Down
4 changes: 2 additions & 2 deletions plugins/arlo/src/arlo_plugin/arlo/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import sys

# construct logger instance to be used by package arlo
logger = logging.getLogger("arlo")
logger = logging.getLogger("lib")
logger.setLevel(logging.INFO)

# output logger to stdout
ch = logging.StreamHandler(sys.stdout)

# log formatting
fmt = logging.Formatter("%(asctime)s %(name)s %(levelname)s: %(message)s")
fmt = logging.Formatter("(arlo) %(levelname)s:%(name)s:%(asctime)s.%(msecs)03d %(message)s", "%H:%M:%S")
ch.setFormatter(fmt)

# configure handler to logger
Expand Down
2 changes: 1 addition & 1 deletion plugins/arlo/src/arlo_plugin/arlo/mqtt_stream_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def on_message(client, userdata, msg):
self.event_stream = mqtt.Client(client_id=f"user_{self.arlo.user_id}_{self._gen_client_number()}", transport="websockets", clean_session=False)
self.event_stream.username_pw_set(self.arlo.user_id, password=self.arlo.request.session.headers.get('Authorization'))
self.event_stream.ws_set_options(path="/mqtt", headers={"Origin": "https://my.arlo.com"})
self.event_stream.enable_logger(logger=logger)
#self.event_stream.enable_logger(logger=logger)
self.event_stream.on_connect = on_connect
self.event_stream.on_message = on_message
self.event_stream.tls_set()
Expand Down
4 changes: 2 additions & 2 deletions plugins/arlo/src/arlo_plugin/arlo/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ def _request(self, url, method='GET', params={}, headers={}, stream=False, raw=F
import logging
import http.client
http.client.HTTPConnection.debuglevel = 1
logging.basicConfig()
#logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
req_log = logging.getLogger('requests.packages.urllib3')
req_log.setLevel(logging.DEBUG)
req_log.propagate = True
"""
#"""

url = f'{url}?eventId={self.gen_event_id()}&time={self.get_time()}'

Expand Down
Loading

0 comments on commit 6a2520e

Please sign in to comment.