Skip to content

Commit

Permalink
Code refactoring - New UserSession and UserProfile
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucino772 authored and lpalmisa committed Mar 17, 2021
1 parent 58a23cf commit cc320dc
Show file tree
Hide file tree
Showing 23 changed files with 325 additions and 373 deletions.
3 changes: 3 additions & 0 deletions mojang/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .auth.session import user
from .user.profile import profile
from .user.api import *
File renamed without changes.
16 changes: 16 additions & 0 deletions mojang/auth/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from ..urls import MOJANG_API

def is_secure(session):
url = MOJANG_API.join('user/security/location')
response = session._request('get', url)
return response.status_code == 204

def get_challenges(session):
url = MOJANG_API.join('user/security/challenges')
response = session._request('get', url)
return response.json()

def verify_ip(session, answers: list):
url = MOJANG_API.join('user/security/location')
response = session._request('post', url, json=answers)
return response.status_code == 204
65 changes: 65 additions & 0 deletions mojang/auth/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import requests
from . import yggdrasil
from . import security
from ..user.profile import UserProfile


class UserSession:

def __init__(self, username: str, password: str):
self.__session = requests.Session()
self.__session.headers.update({'Content-Type': 'application/json'})

self.__access_token = None
self.__client_token = None

self.__username = username
self.__password = password

self.__profile = None

def _request(self, method: str, url: str, **kwargs):
_fct = getattr(self.__session, method)
return _fct(url, **kwargs)

def _update_token(self, **kwargs):
self.__access_token = kwargs.get('access_token', self.__access_token)
self.__client_token = kwargs.get('client_token', self.__client_token)
if self.__access_token is None:
self.__session.headers.pop('Authorization')
else:
self.__session.headers.update({'Authorization': f'Bearer {self.__access_token}'})

# Connection / Disconnection
def connect(self):
if self.__password is not None:
yggdrasil.authenticate(self, self.__username, self.__password)
if self.secure:
self.__profile = UserProfile(self.__session, authenticated=self.secure, load=True)

def close(self):
yggdrasil.invalidate(self, self.__access_token, self.__client_token)

def close_all(self):
yggdrasil.signout(self, self.__username, self.__password)

# Security
@property
def secure(self):
return security.is_secure(self)

@property
def challenges(self):
return security.get_challenges(self)

def verify(self, answers: list):
return security.verify_ip(self, answers)

# Other
@property
def profile(self):
return self.__profile


def user(username: str, password: str):
return UserSession(username, password)
68 changes: 68 additions & 0 deletions mojang/auth/yggdrasil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from ..urls import MOJANG_AUTHSERVER

def authenticate(session, username: str, password: str, client_token=None):
url = MOJANG_AUTHSERVER.join('authenticate')
payload = {
'username': username,
'password': password,
'clientToken': client_token
}

response = session._request('post', url, json=payload)
if response.status_code == 200:
data = response.json()
session._update_token(access_token=data['accessToken'], client_token=data['clientToken'])
else:
pass

def refresh(session, access_token: str, client_token: str):
url = MOJANG_AUTHSERVER.join('refresh')
payload = {
'accessToken': access_token,
'clientToken': client_token
}

response = session._request('post', url, json=payload)
if response.status_code == 200:
data = response.json()
session._update_token(access_token=data['accessToken'], client_token=data['clientToken'])
else:
pass

def validate(session, access_token: str, client_token: str):
url = MOJANG_AUTHSERVER.join('validate')
payload = {
'accessToken': access_token,
'clientToken': client_token
}

response = session._request('post', url, json=payload)
return response.status_code == 204

def signout(session, username: str, password: str):
url = MOJANG_AUTHSERVER.join('signout')
payload = {
'username': username,
'password': password
}

response = session._request('post', url, json=payload)
if response.status_code == 204:
session._update_token(access_token=None)
return True
else:
return False

def invalidate(session, access_token: str, client_token: str):
url = MOJANG_AUTHSERVER.join('invalidate')
payload = {
'accessToken': access_token,
'clientToken': client_token
}

response = session._request('post', url, json=payload)
if response.status_code == 204:
session._update_token(access_token=None)
return True
else:
return False
File renamed without changes.
Empty file added mojang/user/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions mojang/user/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import requests
from ..urls import MOJANG_STATUS, MOJANG_API, MOJANG_SESSION

def status(service=None):
data = {}
response = requests.get(MOJANG_STATUS.join('check'))
if response.status_code == 200:
for status in response.json():
data.update(status)

if service:
return data[service]

return data

def names(player_id: str):
url = MOJANG_API.join('user/profiles/{}/names'.format(player_id))
response = requests.get(url)

names = []
if response.status_code == 200:
data = response.json()

for item in response.json():
if 'changedToAt' in item:
item['changedToAt'] = dt.datetime.fromtimestamp(item['changedToAt'])
names.append((item['name'], item.get('changedToAt',None)))

return names

def uuid(username: str, timestamp=None, only_uuid=True):
url = MOJANG_API.join('users/profiles/minecraft/{}'.format(username))
params = {'at': timestamp} if timestamp else {}

data = {}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
if only_uuid:
return data['id']

return data

def uuids(usernames: list, only_uuid=True):
url = MOJANG_API.join('profiles/minecraft')
data = []

if len(usernames) > 0:
response = requests.post(url, json=usernames)

if response.status_code == 200:
data = response.json()
if only_uuid:
data = list(map(lambda pdata: pdata['id'], data))

return data
File renamed without changes.
116 changes: 116 additions & 0 deletions mojang/user/profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import requests
import datetime as dt
import json
from base64 import urlsafe_b64decode
from ..urls import MINECRAFT_SERVICES, MOJANG_SESSION
from .skin import Skin
from .cape import Cape
from . import api

class UserProperty:

def __init__(self, name: str, fct):
self.__name = name
self.__fct = fct

def __get__(self, obj, cls):
property_name = f'_{cls.__name__}__{self.__name}'
if hasattr(obj, property_name):
return getattr(obj, property_name)
else:
getattr(obj, self.__fct.__name__)()
return getattr(obj, property_name)

class UserProfile:

def __init__(self, session: requests.Session, username=None, authenticated=False, load=False):
self.__session = session
self.__username = username
self.__authenticated = authenticated

if self.__authenticated:
self._load_profile()
self.__username = self.__name
self._load_uuid()
else:
self._load_uuid()

if load:
self._load_names()
self._load_name_change()
if not self.__authenticated:
self._load_profile()

def _load_uuid(self):
data = api.uuid(self.__username, only_uuid=False)
self.__user_id = data['id']
self.__name = data['name']
self.__legacy = data.get('legacy', False)
self.__demo = data.get('demo', False)

def _load_names(self):
self.__names = api.names(self.__user_id)

def _load_name_change(self):
if self.__authenticated:
url = MINECRAFT_SERVICES.join('minecraft/profile/namechange')
response = self.__session.get(url)
if response.status_code == 200:
data = response.json()
self.__created_at = dt.datetime.strptime(data['createdAt'], '%Y-%m-%dT%H:%M:%SZ')
self.__name_change_allowed = data['nameChangeAllowed']
else:
pass
else:
self.__created_at = None
self.__name_change_allowed = None

def _load_profile(self):
if self.__authenticated:
url = MINECRAFT_SERVICES.join('minecraft/profile')
response = self.__session.get(url)

if response.status_code == 200:
data = response.json()
self.__user_id = data['id']
self.__name = data['name']

self.__skins = []
self.__capes = []

for skin in data['skins']:
self.__skins.append(Skin(skin['url'], skin['variant'].lower()))

for cape in data['capes']:
self.__capes.append(Cape(cape['url']))
else:
url = MOJANG_SESSION.join('session/minecraft/profile/{}'.format(self.__user_id))
response = self.__session.get(url)

if response.status_code == 200:
data = response.json()
self.__skins = []
self.__capes = []

for d in data['properties']:
textures = json.loads(urlsafe_b64decode(d['value']))['textures']
if 'SKIN' in textures.keys():
self.__skins.append(Skin(textures['SKIN']['url'], textures['SKIN'].get('metadata',{}).get('model','classic')))
if 'CAPE' in textures.keys():
self.__capes.append(Cape(textures['CAPE']['url']))

name = UserProperty('name', _load_uuid)
uuid = UserProperty('user_id', _load_uuid)
is_legacy = UserProperty('legacy', _load_uuid)
is_demo = UserProperty('demo', _load_uuid)

names = UserProperty('names', _load_names)

created_at = UserProperty('created_at', _load_name_change)
name_change_allowed = UserProperty('name_change_allowed', _load_name_change)

skins = UserProperty('skins', _load_profile)
capes = UserProperty('capes', _load_profile)

def profile(username: str, load=False):
return UserProfile(requests.Session(), username=username, load=load)
File renamed without changes.
Empty file added mojang/utils/__init__.py
Empty file.
File renamed without changes.
2 changes: 0 additions & 2 deletions pymojang/auth/__init__.py

This file was deleted.

26 changes: 0 additions & 26 deletions pymojang/auth/security_check.py

This file was deleted.

Loading

0 comments on commit cc320dc

Please sign in to comment.