-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update user/pass authentication. #39
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,11 @@ | ||
import asyncio | ||
import base64 | ||
import hashlib | ||
import json | ||
import re | ||
import secrets | ||
from datetime import datetime, timedelta | ||
from urllib.parse import parse_qs, urlparse | ||
|
||
import aiohttp | ||
|
||
|
@@ -9,7 +14,7 @@ | |
from .vehicle import Vehicle | ||
|
||
TESLA_API_BASE_URL = "https://owner-api.teslamotors.com/" | ||
TOKEN_URL = TESLA_API_BASE_URL + "oauth/token" | ||
TOKEN_URL = "https://auth.tesla.com/oauth2/v3/authorize" | ||
API_URL = TESLA_API_BASE_URL + "api/1" | ||
|
||
OAUTH_CLIENT_ID = "81527cff06843c8634fdc09e8ac0abefb46ac849f38fe1e431c2ef2106796384" | ||
|
@@ -48,27 +53,73 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): | |
async def close(self): | ||
await self._session.close() | ||
|
||
async def _get_token(self, data): | ||
request_data = { | ||
"client_id": OAUTH_CLIENT_ID, | ||
"client_secret": OAUTH_CLIENT_SECRET, | ||
async def _get_new_token(self): | ||
code_verifier = secrets.token_urlsafe(64) | ||
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).hexdigest().encode()) | ||
state = secrets.token_urlsafe(16) | ||
|
||
params = { | ||
"client_id": "ownerapi", | ||
"code_challenge": code_challenge.decode(), | ||
"code_challenge_method": "S256", | ||
"redirect_uri": "https://auth.tesla.com/void/callback", | ||
"response_type": "code", | ||
"scope": "openid email offline_access", | ||
"state": state | ||
} | ||
request_data.update(data) | ||
|
||
async with self._session.post(TOKEN_URL, data=request_data) as resp: | ||
response_json = await resp.json() | ||
async with self._session.get(TOKEN_URL, params=params) as resp: | ||
response_page = await resp.text() | ||
|
||
input_fields = (f.group(1) for f in re.finditer(r"<input ([^>]+)>", response_page)) | ||
input_fields = ((re.search(r'name="(.*?)"', f), re.search(r'value="(.*?)"', f)) | ||
for f in input_fields) | ||
form_data = {name.group(1): value.group(1) if value else "" | ||
for name, value in input_fields} | ||
form_data["identity"] = self._email | ||
form_data["credential"] = self._password | ||
|
||
async with self._session.post(TOKEN_URL, data=form_data, params=params, allow_redirects=False) as resp: | ||
if resp.status == 401: | ||
raise AuthenticationError(response_json) | ||
raise AuthenticationError("Incorrect login") | ||
if resp.status == 200: | ||
page = await resp.text() | ||
errors = json.loads(re.search(r"var messages = (.*);", page).group(1)) | ||
raise AuthenticationError(errors.get("_", errors)) | ||
|
||
redirect_location = resp.headers["Location"] | ||
args = parse_qs(urlparse(redirect_location).query) | ||
if args["state"][0] != state: | ||
raise AuthenticationError("Incorrect state (possible CSRF attack).") | ||
|
||
data = { | ||
"grant_type": "authorization_code", | ||
"client_id": "ownerapi", | ||
"code": args["code"][0], | ||
"code_verifier": code_verifier, | ||
"redirect_uri": "https://auth.tesla.com/void/callback" | ||
} | ||
async with self._session.post("https://auth.tesla.com/oauth2/v3/token", json=data) as resp: | ||
bearer_token = await resp.json() | ||
|
||
params = { | ||
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", | ||
"client_id": OAUTH_CLIENT_ID, | ||
"client_secret": OAUTH_CLIENT_SECRET | ||
} | ||
headers = {"Authorization": "Bearer {}".format(bearer_token["access_token"])} | ||
async with self._session.post("https://owner-api.teslamotors.com/oauth/token", | ||
headers=headers, params=params) as resp: | ||
access_token = await resp.json() | ||
|
||
# Replace the broken refresh token with the token from the previous step. | ||
access_token["refresh_token"] = bearer_token["refresh_token"] | ||
|
||
# Send token to application via callback. | ||
if self._new_token_callback: | ||
asyncio.create_task(self._new_token_callback(json.dumps(response_json))) | ||
asyncio.create_task(self._new_token_callback(json.dumps(access_token))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changing the interface here from the full token (in past) to just the (new) access_token might brake existing applications. At least it breaks mine. ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same token as before (just with the refresh token replaced). I still need to test with the old tokens, but if you can point out how it's broken that might save me some time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my bad I just read access_token and assumed it would be just the one. Maybe a name change of the var would be better. |
||
|
||
return response_json | ||
|
||
async def _get_new_token(self): | ||
data = {"grant_type": "password", "email": self._email, "password": self._password} | ||
return await self._get_token(data) | ||
return access_token | ||
|
||
async def _refresh_token(self, refresh_token): | ||
data = {"grant_type": "refresh_token", "refresh_token": refresh_token} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using magic URLs in mid code is not a good idea when having other (similar) URLs defined as static vars at the beginning/top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hence cleanup still needed. This is just a first pass before I went to bed.