Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update user/pass authentication. #39

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 66 additions & 15 deletions tesla_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import asyncio
import base64
import hashlib
import json
import re
import secrets
from datetime import datetime, timedelta
from urllib.parse import parse_qs, urlparse

import aiohttp

Expand All @@ -9,7 +14,7 @@
from .vehicle import Vehicle

TESLA_API_BASE_URL = "https://owner-api.teslamotors.com/"
TOKEN_URL = TESLA_API_BASE_URL + "oauth/token"
TOKEN_URL = "https://auth.tesla.com/oauth2/v3/authorize"
API_URL = TESLA_API_BASE_URL + "api/1"

OAUTH_CLIENT_ID = "81527cff06843c8634fdc09e8ac0abefb46ac849f38fe1e431c2ef2106796384"
Expand Down Expand Up @@ -48,27 +53,73 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
async def close(self):
await self._session.close()

async def _get_token(self, data):
request_data = {
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET,
async def _get_new_token(self):
code_verifier = secrets.token_urlsafe(64)
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).hexdigest().encode())
state = secrets.token_urlsafe(16)

params = {
"client_id": "ownerapi",
"code_challenge": code_challenge.decode(),
"code_challenge_method": "S256",
"redirect_uri": "https://auth.tesla.com/void/callback",
"response_type": "code",
"scope": "openid email offline_access",
"state": state
}
request_data.update(data)

async with self._session.post(TOKEN_URL, data=request_data) as resp:
response_json = await resp.json()
async with self._session.get(TOKEN_URL, params=params) as resp:
response_page = await resp.text()

input_fields = (f.group(1) for f in re.finditer(r"<input ([^>]+)>", response_page))
input_fields = ((re.search(r'name="(.*?)"', f), re.search(r'value="(.*?)"', f))
for f in input_fields)
form_data = {name.group(1): value.group(1) if value else ""
for name, value in input_fields}
form_data["identity"] = self._email
form_data["credential"] = self._password

async with self._session.post(TOKEN_URL, data=form_data, params=params, allow_redirects=False) as resp:
if resp.status == 401:
raise AuthenticationError(response_json)
raise AuthenticationError("Incorrect login")
if resp.status == 200:
page = await resp.text()
errors = json.loads(re.search(r"var messages = (.*);", page).group(1))
raise AuthenticationError(errors.get("_", errors))

redirect_location = resp.headers["Location"]
args = parse_qs(urlparse(redirect_location).query)
if args["state"][0] != state:
raise AuthenticationError("Incorrect state (possible CSRF attack).")

data = {
"grant_type": "authorization_code",
"client_id": "ownerapi",
"code": args["code"][0],
"code_verifier": code_verifier,
"redirect_uri": "https://auth.tesla.com/void/callback"
}
async with self._session.post("https://auth.tesla.com/oauth2/v3/token", json=data) as resp:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using magic URLs in mid code is not a good idea when having other (similar) URLs defined as static vars at the beginning/top.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hence cleanup still needed. This is just a first pass before I went to bed.

bearer_token = await resp.json()

params = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"client_id": OAUTH_CLIENT_ID,
"client_secret": OAUTH_CLIENT_SECRET
}
headers = {"Authorization": "Bearer {}".format(bearer_token["access_token"])}
async with self._session.post("https://owner-api.teslamotors.com/oauth/token",
headers=headers, params=params) as resp:
access_token = await resp.json()

# Replace the broken refresh token with the token from the previous step.
access_token["refresh_token"] = bearer_token["refresh_token"]

# Send token to application via callback.
if self._new_token_callback:
asyncio.create_task(self._new_token_callback(json.dumps(response_json)))
asyncio.create_task(self._new_token_callback(json.dumps(access_token)))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the interface here from the full token (in past) to just the (new) access_token might brake existing applications. At least it breaks mine. ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same token as before (just with the refresh token replaced).

I still need to test with the old tokens, but if you can point out how it's broken that might save me some time.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad I just read access_token and assumed it would be just the one. Maybe a name change of the var would be better.


return response_json

async def _get_new_token(self):
data = {"grant_type": "password", "email": self._email, "password": self._password}
return await self._get_token(data)
return access_token

async def _refresh_token(self, refresh_token):
data = {"grant_type": "refresh_token", "refresh_token": refresh_token}
Expand Down