From 71cd21faa2a25057eac005f891509f028d9d257f Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sun, 31 Jan 2021 17:19:16 +0000 Subject: [PATCH] Update user/pass authentication. --- tesla_api/__init__.py | 81 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/tesla_api/__init__.py b/tesla_api/__init__.py index 6d5ab02..be28da9 100644 --- a/tesla_api/__init__.py +++ b/tesla_api/__init__.py @@ -1,6 +1,11 @@ import asyncio +import base64 +import hashlib import json +import re +import secrets from datetime import datetime, timedelta +from urllib.parse import parse_qs, urlparse import aiohttp @@ -9,7 +14,7 @@ from .vehicle import Vehicle TESLA_API_BASE_URL = "https://owner-api.teslamotors.com/" -TOKEN_URL = TESLA_API_BASE_URL + "oauth/token" +TOKEN_URL = "https://auth.tesla.com/oauth2/v3/authorize" API_URL = TESLA_API_BASE_URL + "api/1" OAUTH_CLIENT_ID = "81527cff06843c8634fdc09e8ac0abefb46ac849f38fe1e431c2ef2106796384" @@ -48,27 +53,73 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): async def close(self): await self._session.close() - async def _get_token(self, data): - request_data = { - "client_id": OAUTH_CLIENT_ID, - "client_secret": OAUTH_CLIENT_SECRET, + async def _get_new_token(self): + code_verifier = secrets.token_urlsafe(64) + code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).hexdigest().encode()) + state = secrets.token_urlsafe(16) + + params = { + "client_id": "ownerapi", + "code_challenge": code_challenge.decode(), + "code_challenge_method": "S256", + "redirect_uri": "https://auth.tesla.com/void/callback", + "response_type": "code", + "scope": "openid email offline_access", + "state": state } - request_data.update(data) - async with self._session.post(TOKEN_URL, data=request_data) as resp: - response_json = await resp.json() + async with self._session.get(TOKEN_URL, params=params) as resp: + response_page = await resp.text() + + input_fields = (f.group(1) for f in re.finditer(r"]+)>", response_page)) + input_fields = ((re.search(r'name="(.*?)"', f), re.search(r'value="(.*?)"', f)) + for f in input_fields) + form_data = {name.group(1): value.group(1) if value else "" + for name, value in input_fields} + form_data["identity"] = self._email + form_data["credential"] = self._password + + async with self._session.post(TOKEN_URL, data=form_data, params=params, allow_redirects=False) as resp: if resp.status == 401: - raise AuthenticationError(response_json) + raise AuthenticationError("Incorrect login") + if resp.status == 200: + page = await resp.text() + errors = json.loads(re.search(r"var messages = (.*);", page).group(1)) + raise AuthenticationError(errors.get("_", errors)) + + redirect_location = resp.headers["Location"] + args = parse_qs(urlparse(redirect_location).query) + if args["state"][0] != state: + raise AuthenticationError("Incorrect state (possible CSRF attack).") + + data = { + "grant_type": "authorization_code", + "client_id": "ownerapi", + "code": args["code"][0], + "code_verifier": code_verifier, + "redirect_uri": "https://auth.tesla.com/void/callback" + } + async with self._session.post("https://auth.tesla.com/oauth2/v3/token", json=data) as resp: + bearer_token = await resp.json() + + params = { + "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", + "client_id": OAUTH_CLIENT_ID, + "client_secret": OAUTH_CLIENT_SECRET + } + headers = {"Authorization": "Bearer {}".format(bearer_token["access_token"])} + async with self._session.post("https://owner-api.teslamotors.com/oauth/token", + headers=headers, params=params) as resp: + access_token = await resp.json() + + # Replace the broken refresh token with the token from the previous step. + access_token["refresh_token"] = bearer_token["refresh_token"] # Send token to application via callback. if self._new_token_callback: - asyncio.create_task(self._new_token_callback(json.dumps(response_json))) + asyncio.create_task(self._new_token_callback(json.dumps(access_token))) - return response_json - - async def _get_new_token(self): - data = {"grant_type": "password", "email": self._email, "password": self._password} - return await self._get_token(data) + return access_token async def _refresh_token(self, refresh_token): data = {"grant_type": "refresh_token", "refresh_token": refresh_token}