From b924a07531274abed03216fb56ab5cd40fa96b2f Mon Sep 17 00:00:00 2001 From: Erik Sundell Date: Sun, 31 Oct 2021 12:10:18 +0100 Subject: [PATCH 1/4] pre-commit: adjust flake8 config to comply with black --- .flake8 | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.flake8 b/.flake8 index cae9b75..d6927b9 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,11 @@ [flake8] +# Adjustments to comply with black autoformatting as described here: +# https://black.readthedocs.io/en/latest/guides/using_black_with_other_tools.html#flake8 +# +extend-ignore = E203 +max-line-length = 88 + +# Project specific adjustments +# builtins = c From a72f9eea00d233eaa2698f19fc58cb62b646c67d Mon Sep 17 00:00:00 2001 From: Erik Sundell Date: Sun, 31 Oct 2021 12:12:57 +0100 Subject: [PATCH 2/4] pre-commit: run black hook without string normalization --- dev-jupyterhub_config.py | 4 +- docs/conf.py | 37 ++-- nativeauthenticator/crypto/crypto.py | 13 +- nativeauthenticator/crypto/encoding.py | 21 ++- nativeauthenticator/crypto/signing.py | 52 +++--- nativeauthenticator/handlers.py | 100 ++++++----- nativeauthenticator/nativeauthenticator.py | 161 ++++++++++-------- nativeauthenticator/orm.py | 3 +- .../tests/test_authenticator.py | 92 +++++----- nativeauthenticator/tests/test_orm.py | 8 +- 10 files changed, 265 insertions(+), 226 deletions(-) diff --git a/dev-jupyterhub_config.py b/dev-jupyterhub_config.py index 606f795..a21d5dc 100644 --- a/dev-jupyterhub_config.py +++ b/dev-jupyterhub_config.py @@ -29,9 +29,7 @@ c.NativeAuthenticator.allow_2fa = True -c.NativeAuthenticator.tos = ( - 'I agree to the TOS.' -) +c.NativeAuthenticator.tos = 'I agree to the TOS.' # c.NativeAuthenticator.recaptcha_key = "your key" # c.NativeAuthenticator.recaptcha_secret = "your secret" diff --git a/docs/conf.py b/docs/conf.py index 590ae55..2f7dddc 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -33,8 +33,7 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [ -] +extensions = [] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -105,15 +104,12 @@ # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -123,9 +119,13 @@ # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'NativeAuthenticator.tex', - 'Native Authenticator Documentation', - 'Leticia Portella', 'manual'), + ( + master_doc, + 'NativeAuthenticator.tex', + 'Native Authenticator Documentation', + 'Leticia Portella', + 'manual', + ), ] @@ -134,8 +134,13 @@ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [ - (master_doc, 'nativeauthenticator', 'Native Authenticator Documentation', - [author], 1) + ( + master_doc, + 'nativeauthenticator', + 'Native Authenticator Documentation', + [author], + 1, + ) ] @@ -145,9 +150,15 @@ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'NativeAuthenticator', 'Native Authenticator Documentation', - author, 'NativeAuthenticator', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + 'NativeAuthenticator', + 'Native Authenticator Documentation', + author, + 'NativeAuthenticator', + 'One line description of project.', + 'Miscellaneous', + ), ] diff --git a/nativeauthenticator/crypto/crypto.py b/nativeauthenticator/crypto/crypto.py index 6fee514..0858c00 100644 --- a/nativeauthenticator/crypto/crypto.py +++ b/nativeauthenticator/crypto/crypto.py @@ -10,6 +10,7 @@ class InvalidAlgorithm(ValueError): """Algorithm is not supported by hashlib.""" + pass @@ -28,8 +29,7 @@ def salted_hmac(key_salt, value, secret, *, algorithm='sha1'): hasher = getattr(hashlib, algorithm) except AttributeError as e: raise InvalidAlgorithm( - '%r is not an algorithm accepted by the hashlib module.' - % algorithm + '%r is not an algorithm accepted by the hashlib module.' % algorithm ) from e # We need to generate a derived key from our base key. We can do this by # passing the key_salt and our base key through a pseudo-random function. @@ -41,8 +41,7 @@ def salted_hmac(key_salt, value, secret, *, algorithm='sha1'): return hmac.new(key, msg=force_bytes(value), digestmod=hasher) -RANDOM_STRING_CHARS = \ - 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' +RANDOM_STRING_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' def get_random_string(length, allowed_chars=RANDOM_STRING_CHARS): @@ -71,8 +70,4 @@ def pbkdf2(password, salt, iterations, dklen=0, digest=None): dklen = dklen or None password = force_bytes(password) salt = force_bytes(salt) - return hashlib.pbkdf2_hmac(digest().name, - password, - salt, - iterations, - dklen) + return hashlib.pbkdf2_hmac(digest().name, password, salt, iterations, dklen) diff --git a/nativeauthenticator/crypto/encoding.py b/nativeauthenticator/crypto/encoding.py index bd62eb5..28233be 100644 --- a/nativeauthenticator/crypto/encoding.py +++ b/nativeauthenticator/crypto/encoding.py @@ -12,14 +12,15 @@ def __init__(self, obj, *args): def __str__(self): return '{}. You passed in {!r} ({})'.format( - super().__str__(), - self.obj, - type(self.obj)) + super().__str__(), self.obj, type(self.obj) + ) _PROTECTED_TYPES = ( type(None), - int, float, Decimal, + int, + float, + Decimal, datetime.datetime, datetime.date, datetime.time, @@ -89,10 +90,9 @@ def force_bytes(s, encoding='utf-8', strings_only=False, errors='strict'): # And then everything above 128, because bytes ≥ 128 are part of multibyte # Unicode characters. _hexdig = '0123456789ABCDEFabcdef' -_hextobyte.update({ - (a + b).encode(): bytes.fromhex(a + b) - for a in _hexdig[8:] for b in _hexdig -}) +_hextobyte.update( + {(a + b).encode(): bytes.fromhex(a + b) for a in _hexdig[8:] for b in _hexdig} +) def uri_to_iri(uri): @@ -165,9 +165,8 @@ def repercent_broken_unicode(path): except UnicodeDecodeError as e: # CVE-2019-14235: A recursion shouldn't be used since the exception # handling uses massive amounts of memory - repercent = quote(path[e.start:e.end], - safe=b"/#%[]=:;$&()+,!?*@'~") - path = path[:e.start] + repercent.encode() + path[e.end:] + repercent = quote(path[e.start : e.end], safe=b"/#%[]=:;$&()+,!?*@'~") + path = path[: e.start] + repercent.encode() + path[e.end :] else: return path diff --git a/nativeauthenticator/crypto/signing.py b/nativeauthenticator/crypto/signing.py index 4be7eae..74bc1d5 100644 --- a/nativeauthenticator/crypto/signing.py +++ b/nativeauthenticator/crypto/signing.py @@ -43,17 +43,18 @@ from .crypto import salted_hmac _SEP_UNSAFE = re.compile(r'^[A-z0-9-_=]*$') -BASE62_ALPHABET = \ - '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' +BASE62_ALPHABET = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' class BadSignature(Exception): """Signature does not match.""" + pass class SignatureExpired(BadSignature): """Signature timestamp is older than required max_age.""" + pass @@ -93,8 +94,8 @@ def b64_decode(s): def base64_hmac(salt, value, key, algorithm='sha1'): return b64_encode( - salted_hmac(salt, value, key, algorithm=algorithm).digest() - ).decode() + salted_hmac(salt, value, key, algorithm=algorithm).digest() + ).decode() class JSONSerializer: @@ -102,6 +103,7 @@ class JSONSerializer: Simple wrapper around json to be used in signing.dumps and signing.loads. """ + def dumps(self, obj): return json.dumps(obj, separators=(',', ':')).encode('latin-1') @@ -109,11 +111,9 @@ def loads(self, data): return json.loads(data.decode('latin-1')) -def dumps(obj, - key=None, - salt='django.core.signing', - serializer=JSONSerializer, - compress=False): +def dumps( + obj, key=None, salt='django.core.signing', serializer=JSONSerializer, compress=False +): """ Return URL-safe, hmac signed base64 compressed JSON string. If key is None, raises Exception. The hmac algorithm is the default @@ -130,24 +130,22 @@ def dumps(obj, The serializer is expected to return a bytestring. """ - return TimestampSigner(key, salt=salt).sign_object(obj, - serializer=serializer, - compress=compress) + return TimestampSigner(key, salt=salt).sign_object( + obj, serializer=serializer, compress=compress + ) -def loads(s, - key=None, - salt='django.core.signing', - serializer=JSONSerializer, - max_age=None): +def loads( + s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None +): """ Reverse of dumps(), raise BadSignature if signature fails. The serializer is expected to accept a bytestring. """ - return TimestampSigner(key, salt=salt).unsign_object(s, - serializer=serializer, - max_age=max_age) + return TimestampSigner(key, salt=salt).unsign_object( + s, serializer=serializer, max_age=max_age + ) class Signer: @@ -160,14 +158,14 @@ def __init__(self, key, sep=':', salt=None, algorithm=None): 'only A-z0-9-_=)' % sep, ) self.salt = salt or '{}.{}'.format( - self.__class__.__module__, self.__class__.__name__ - ) + self.__class__.__module__, self.__class__.__name__ + ) self.algorithm = algorithm or 'sha256' def signature(self, value): - return base64_hmac(self.salt + 'signer', - value, self.key, - algorithm=self.algorithm) + return base64_hmac( + self.salt + 'signer', value, self.key, algorithm=self.algorithm + ) def sign(self, value): return f'{value}{self.sep}{self.signature(value)}' @@ -220,7 +218,6 @@ def unsign_object(self, signed_obj, serializer=JSONSerializer, **kwargs): class TimestampSigner(Signer): - def timestamp(self): return b62_encode(int(time.time())) @@ -242,6 +239,5 @@ def unsign(self, value, max_age=None): # Check timestamp is not older than max_age age = time.time() - timestamp if age > max_age: - raise SignatureExpired( - f'Signature age {age} > {max_age} seconds') + raise SignatureExpired(f'Signature age {age} > {max_age} seconds') return value diff --git a/nativeauthenticator/handlers.py b/nativeauthenticator/handlers.py index 92a4b5c..683a3b6 100644 --- a/nativeauthenticator/handlers.py +++ b/nativeauthenticator/handlers.py @@ -7,11 +7,14 @@ from jinja2 import FileSystemLoader from jupyterhub.handlers import BaseHandler from jupyterhub.handlers.login import LoginHandler + try: from jupyterhub.scopes import needs_scope + admin_users_scope = needs_scope("admin:users") except ImportError: from jupyterhub.utils import admin_only + admin_users_scope = admin_only from tornado import web @@ -41,6 +44,7 @@ def __init__(self, *args, **kwargs): class SignUpHandler(LocalBase): """Render the sign in page.""" + async def get(self): if not self.authenticator.enable_signup: raise web.HTTPError(404) @@ -63,33 +67,41 @@ def get_result_message(self, user, taken, human=True): # Always error if username is taken. if taken: alert = 'alert-danger' - message = ("Something went wrong. It appears that this " - "username is already in use. Please try again " - "with a different username.") + message = ( + "Something went wrong. It appears that this " + "username is already in use. Please try again " + "with a different username." + ) else: # Error if user creation was not successful. if not user: alert = 'alert-danger' pw_len = self.authenticator.minimum_password_length if pw_len: - message = ("Something went wrong. Be sure your username " - "does not contain spaces or commas, your " - "password has at least {} characters and is " - "not too common.").format(pw_len) + message = ( + "Something went wrong. Be sure your username " + "does not contain spaces or commas, your " + "password has at least {} characters and is " + "not too common." + ).format(pw_len) else: - message = ("Something went wrong. Be sure your username " - "does not contain spaces or commas and your " - "password is not too common.") + message = ( + "Something went wrong. Be sure your username " + "does not contain spaces or commas and your " + "password is not too common." + ) # If user creation went through & open-signup is enabled, success. elif self.authenticator.open_signup: alert = 'alert-success' - message = ('The signup was successful. You can now go to ' - 'home page and log in the system') + message = ( + 'The signup was successful. You can now go to ' + 'home page and log in the system' + ) if not human: alert = 'alert-danger' - message = ("You failed the reCAPTCHA. Please try again") + message = "You failed the reCAPTCHA. Please try again" return alert, message @@ -101,14 +113,15 @@ async def post(self): url = "https://www.google.com/recaptcha/api/siteverify" if self.authenticator.recaptcha_key: - recaptcha_response = \ - self.get_body_argument('g-recaptcha-response', strip=True) + recaptcha_response = self.get_body_argument( + 'g-recaptcha-response', strip=True + ) if recaptcha_response == "": assume_human = False else: data = { 'secret': self.authenticator.recaptcha_secret, - 'response': recaptcha_response + 'response': recaptcha_response, } validation_status = requests.post(url, data=data) assume_human = validation_status.json().get("success") @@ -122,7 +135,7 @@ async def post(self): 'username': self.get_body_argument('username', strip=False), 'pw': self.get_body_argument('pw', strip=False), 'email': self.get_body_argument('email', '', strip=False), - 'has_2fa': bool(self.get_body_argument('2fa', '', strip=False)) + 'has_2fa': bool(self.get_body_argument('2fa', '', strip=False)), } taken = self.authenticator.user_exists(user_info['username']) user = self.authenticator.create_user(**user_info) @@ -153,6 +166,7 @@ async def post(self): class AuthorizationHandler(LocalBase): """Render the sign in page.""" + @admin_users_scope async def get(self): html = await self.render_template( @@ -177,7 +191,8 @@ async def get(self, slug): if self.authenticator.allow_self_approval_for: try: data = AuthorizeHandler.validate_slug( - slug, self.authenticator.secret_key) + slug, self.authenticator.secret_key + ) must_stop = False except ValueError: pass @@ -202,6 +217,7 @@ async def get(self, slug): @staticmethod def validate_slug(slug, key): from .crypto.signing import Signer, BadSignature + s = Signer(key) try: obj = s.unsign_object(slug) @@ -216,9 +232,9 @@ def validate_slug(slug, key): # before the T year_month_day = datestr.split("-") - dateobj = date(int(year_month_day[0]), - int(year_month_day[1]), - int(year_month_day[2])) + dateobj = date( + int(year_month_day[0]), int(year_month_day[1]), int(year_month_day[2]) + ) # after the T # manually parsing iso-8601 times with a colon in the timezone @@ -259,15 +275,14 @@ async def post(self): else: alert = 'alert-danger' pw_len = self.authenticator.minimum_password_length - msg = ('Something went wrong! Be sure your new ' - 'password has at least {} characters and is ' - 'not too common.').format(pw_len) + msg = ( + 'Something went wrong! Be sure your new ' + 'password has at least {} characters and is ' + 'not too common.' + ).format(pw_len) html = await self.render_template( - 'change-password.html', - user_name=user.name, - result_message=msg, - alert=alert + 'change-password.html', user_name=user.name, result_message=msg, alert=alert ) self.finish(html) @@ -292,26 +307,25 @@ async def post(self, user_name): if success: alert = 'alert-success' - msg = ('The password for {} has been changed ' - 'successfully').format(user_name) + msg = ('The password for {} has been changed ' 'successfully').format( + user_name + ) else: alert = 'alert-danger' pw_len = self.authenticator.minimum_password_length - msg = ('Something went wrong! Be sure the new password ' - 'for {} has at least {} characters and is ' - 'not too common.').format(user_name, pw_len) + msg = ( + 'Something went wrong! Be sure the new password ' + 'for {} has at least {} characters and is ' + 'not too common.' + ).format(user_name, pw_len) html = await self.render_template( - 'change-password.html', - user_name=user_name, - result_message=msg, - alert=alert + 'change-password.html', user_name=user_name, result_message=msg, alert=alert ) self.finish(html) class LoginHandler(LoginHandler, LocalBase): - def _render(self, login_error=None, username=None): return self.render_template( 'native-login.html', @@ -351,13 +365,13 @@ async def post(self): # and is just not authorised nuser = self.authenticator.get_user(data['username']) if nuser is not None: - if (nuser.is_valid_password(data['password']) - and not nuser.is_authorized): + if ( + nuser.is_valid_password(data['password']) + and not nuser.is_authorized + ): error = 'User has not been authorized by administrator yet' - html = await self._render( - login_error=error, username=data['username'] - ) + html = await self._render(login_error=error, username=data['username']) self.finish(html) diff --git a/nativeauthenticator/nativeauthenticator.py b/nativeauthenticator/nativeauthenticator.py index 325445e..2a40ac2 100644 --- a/nativeauthenticator/nativeauthenticator.py +++ b/nativeauthenticator/nativeauthenticator.py @@ -36,108 +36,123 @@ class NativeAuthenticator(Authenticator): COMMON_PASSWORDS = None recaptcha_key = Unicode( config=True, - help=("Your key to enable reCAPTCHA as described at " - "https://developers.google.com/recaptcha/intro") + help=( + "Your key to enable reCAPTCHA as described at " + "https://developers.google.com/recaptcha/intro" + ), ).tag(default=None) recaptcha_secret = Unicode( config=True, - help=("Your secret to enable reCAPTCHA as described at " - "https://developers.google.com/recaptcha/intro") + help=( + "Your secret to enable reCAPTCHA as described at " + "https://developers.google.com/recaptcha/intro" + ), ).tag(default=None) tos = Unicode( config=True, - help=("The HTML to present next to the Term of Service " - "checkbox") + help=("The HTML to present next to the Term of Service " "checkbox"), ).tag(default=None) self_approval_server = Dict( config=True, - help=("SMTP server information as a dictionary of 'url', 'usr'" - "and 'pwd' to use for sending email, e.g." - "self_approval_server={'url': 'smtp.gmail.com', 'usr': 'myself'" - "'pwd': 'mypassword'}") + help=( + "SMTP server information as a dictionary of 'url', 'usr'" + "and 'pwd' to use for sending email, e.g." + "self_approval_server={'url': 'smtp.gmail.com', 'usr': 'myself'" + "'pwd': 'mypassword'}" + ), ).tag(default=None) secret_key = Unicode( config=True, - help=("Secret key to cryptographically sign the " - "self-approved URL (if allow_self_approval is utilized)") + help=( + "Secret key to cryptographically sign the " + "self-approved URL (if allow_self_approval is utilized)" + ), ).tag(default="") allow_self_approval_for = Unicode( allow_none=True, config=True, - help=("Use self-service authentication (rather than " - "admin-based authentication) for users whose " - "email match this patter. Note that this forces " - "ask_email_on_signup to be True.") + help=( + "Use self-service authentication (rather than " + "admin-based authentication) for users whose " + "email match this patter. Note that this forces " + "ask_email_on_signup to be True." + ), ).tag(default=None) self_approval_email = Tuple( - Unicode(), Unicode(), Unicode(), + Unicode(), + Unicode(), + Unicode(), config=True, - default_value=("do-not-reply@my-domain.com", - "Welcome to JupyterHub on my-domain", - ("Your JupyterHub account on my-domain has been " - "created, but it's inactive.\n" - "If you did not create the account yourself, " - "IGNORE this message:\n" - "somebody is trying to use your email to get an " - "unathorized account!\n" - "If you did create the account yourself, navigate " - "to {approval_url} to activate it.\n")) - + default_value=( + "do-not-reply@my-domain.com", + "Welcome to JupyterHub on my-domain", + ( + "Your JupyterHub account on my-domain has been " + "created, but it's inactive.\n" + "If you did not create the account yourself, " + "IGNORE this message:\n" + "somebody is trying to use your email to get an " + "unathorized account!\n" + "If you did create the account yourself, navigate " + "to {approval_url} to activate it.\n" + ), + ), ) check_common_password = Bool( config=True, - help=("Creates a verification of password strength " - "when a new user makes signup") + help=( + "Creates a verification of password strength " + "when a new user makes signup" + ), ).tag(default=False) minimum_password_length = Integer( config=True, - help=("Check if the length of the password is at least this size on " - "signup") + help=("Check if the length of the password is at least this size on " "signup"), ).tag(default=1) allowed_failed_logins = Integer( config=True, - help=("Configures the number of failed attempts a user can have " - "before being blocked.") + help=( + "Configures the number of failed attempts a user can have " + "before being blocked." + ), ).tag(default=0) seconds_before_next_try = Integer( config=True, - help=("Configures the number of seconds a user has to wait " - "after being blocked. Default is 600.") + help=( + "Configures the number of seconds a user has to wait " + "after being blocked. Default is 600." + ), ).tag(default=600) enable_signup = Bool( config=True, default_value=True, - help=("Allows every user to registry a new account") + help=("Allows every user to registry a new account"), ) open_signup = Bool( config=True, default_value=False, - help=("Allows every user that made sign up to automatically log in " - "the system without needing admin authorization") + help=( + "Allows every user that made sign up to automatically log in " + "the system without needing admin authorization" + ), ) - ask_email_on_signup = Bool( - False, - config=True, - help="Asks for email on signup" - ) + ask_email_on_signup = Bool(False, config=True, help="Asks for email on signup") import_from_firstuse = Bool( - False, - config=True, - help="Import users from FirstUse Authenticator database" + False, config=True, help="Import users from FirstUse Authenticator database" ) firstuse_db_path = Unicode( @@ -145,20 +160,16 @@ class NativeAuthenticator(Authenticator): config=True, help=""" Path to store the db file of FirstUse with username / pwd hash in - """ + """, ) delete_firstuse_db_after_import = Bool( config=True, default_value=False, - help="Deletes FirstUse Authenticator database after the import" + help="Deletes FirstUse Authenticator database after the import", ) - allow_2fa = Bool( - False, - config=True, - help="" - ) + allow_2fa = Bool(False, config=True, help="") def __init__(self, add_new_table=True, *args, **kwargs): super().__init__(*args, **kwargs) @@ -178,8 +189,10 @@ def setup_self_approval(self): self.log.error("self_approval and open_signup are conflicts!") self.ask_email_on_signup = True if len(self.secret_key) < 8: - raise ValueError("Secret_key must be a random string of " - "len > 8 when using self_approval") + raise ValueError( + "Secret_key must be a random string of " + "len > 8 when using self_approval" + ) def add_new_table(self): inspector = inspect(self.db.bind) @@ -188,8 +201,7 @@ def add_new_table(self): def add_login_attempt(self, username): if not self.login_attempts.get(username): - self.login_attempts[username] = {'count': 1, - 'time': datetime.now()} + self.login_attempts[username] = {'count': 1, 'time': datetime.now()} else: self.login_attempts[username]['count'] += 1 self.login_attempts[username]['time'] = datetime.now() @@ -232,10 +244,7 @@ def authenticate(self, handler, data): if self.is_blocked(username): return - validations = [ - user.is_authorized, - user.is_valid_password(password) - ] + validations = [user.is_authorized, user.is_valid_password(password)] if user.has_2fa: validations.append(user.is_valid_token(data.get('2fa'))) @@ -247,8 +256,7 @@ def authenticate(self, handler, data): def is_password_common(self, password): common_credentials_file = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - 'common-credentials.txt' + os.path.dirname(os.path.abspath(__file__)), 'common-credentials.txt' ) if not self.COMMON_PASSWORDS: with open(common_credentials_file) as f: @@ -295,8 +303,7 @@ def create_user(self, username, pw, **kwargs): if self.user_exists(username): return - if not self.is_password_strong(pw) or \ - not self.validate_username(username): + if not self.is_password_strong(pw) or not self.validate_username(username): return if not self.enable_signup: @@ -329,8 +336,7 @@ def generate_approval_url(self, username, when=None): if when is None: when = datetime.now(tz.utc) + timedelta(minutes=15) s = Signer(self.secret_key) - u = s.sign_object({"username": username, - "expire": when.isoformat()}) + u = s.sign_object({"username": username, "expire": when.isoformat()}) return "/confirm/" + u def send_approval_email(self, dest, url): @@ -342,18 +348,21 @@ def send_approval_email(self, dest, url): try: if self.self_approval_server: s = smtplib.SMTP_SSL(self.self_approval_server['url']) - s.login(self.self_approval_server['usr'], - self.self_approval_server['pwd']) + s.login( + self.self_approval_server['usr'], self.self_approval_server['pwd'] + ) else: s = smtplib.SMTP('localhost') s.send_message(msg) s.quit() except Exception as e: self.log.error(e) - raise web.HTTPError(503, - reason="Self-authorization email could not " + - "be sent. Please contact the jupyterhub " + - "admin about this.") + raise web.HTTPError( + 503, + reason="Self-authorization email could not " + + "be sent. Please contact the jupyterhub " + + "admin about this.", + ) def get_unauthed_amount(self): unauthed = 0 @@ -423,7 +432,9 @@ def add_data_from_firstuse(self): if not new_user: error = '''User {} was not created. Check password restrictions or username problems before trying - again'''.format(user) + again'''.format( + user + ) raise ValueError(error) if self.delete_firstuse_db_after_import: diff --git a/nativeauthenticator/orm.py b/nativeauthenticator/orm.py index f30d5d9..dd45ce7 100644 --- a/nativeauthenticator/orm.py +++ b/nativeauthenticator/orm.py @@ -57,8 +57,7 @@ def change_authorization(cls, db, username): def validate_email(self, key, address): if not address: return - assert re.match(r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$", - address) + assert re.match(r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$", address) return address def is_valid_token(self, token): diff --git a/nativeauthenticator/tests/test_authenticator.py b/nativeauthenticator/tests/test_authenticator.py index f704cc1..dc81256 100644 --- a/nativeauthenticator/tests/test_authenticator.py +++ b/nativeauthenticator/tests/test_authenticator.py @@ -30,19 +30,21 @@ def app(): pytestmark = pytestmark(pytest.mark.usefixtures("tmpcwd")) -@pytest.mark.parametrize("is_admin,open_signup,expected_authorization", [ - (False, False, False), - (True, False, True), - (False, True, True), - (True, True, True) -]) -async def test_create_user(is_admin, open_signup, expected_authorization, - tmpcwd, app): - '''Test method create_user for new user and authorization ''' +@pytest.mark.parametrize( + "is_admin,open_signup,expected_authorization", + [ + (False, False, False), + (True, False, True), + (False, True, True), + (True, True, True), + ], +) +async def test_create_user(is_admin, open_signup, expected_authorization, tmpcwd, app): + '''Test method create_user for new user and authorization''' auth = NativeAuthenticator(db=app.db) if is_admin: - auth.admin_users = ({'johnsnow'}) + auth.admin_users = {'johnsnow'} if open_signup: auth.open_signup = True @@ -117,14 +119,18 @@ async def test_get_unauthed_amount(tmpcwd, app): assert auth.get_unauthed_amount() == 1 -@pytest.mark.parametrize("password,min_len,expected", [ - ("qwerty", 1, False), - ("agameofthrones", 1, True), - ("agameofthrones", 15, False), - ("averyveryverylongpassword", 15, True), -]) -async def test_create_user_with_strong_passwords(password, min_len, expected, - tmpcwd, app): +@pytest.mark.parametrize( + "password,min_len,expected", + [ + ("qwerty", 1, False), + ("agameofthrones", 1, True), + ("agameofthrones", 15, False), + ("averyveryverylongpassword", 15, True), + ], +) +async def test_create_user_with_strong_passwords( + password, min_len, expected, tmpcwd, app +): '''Test if method create_user and strong passwords mesh''' auth = NativeAuthenticator(db=app.db) auth.check_common_password = True @@ -168,12 +174,14 @@ async def test_no_change_to_bad_password(tmpcwd, app): assert auth.get_user('johnsnow').is_valid_password('DaenerysTargaryen') -@pytest.mark.parametrize("enable_signup,expected_success", [ - (True, True), - (False, False), -]) -async def test_create_user_disable(enable_signup, expected_success, - tmpcwd, app): +@pytest.mark.parametrize( + "enable_signup,expected_success", + [ + (True, True), + (False, False), + ], +) +async def test_create_user_disable(enable_signup, expected_success, tmpcwd, app): '''Test method get_or_create_user not create user if signup is disabled''' auth = NativeAuthenticator(db=app.db) auth.enable_signup = enable_signup @@ -186,22 +194,25 @@ async def test_create_user_disable(enable_signup, expected_success, assert not user -@pytest.mark.parametrize("username,password,authorized,expected", [ - ("name", '123', False, False), - ("johnsnow", '123', True, False), - ("Snow", 'password', True, False), - ("johnsnow", 'password', False, False), - ("johnsnow", 'password', True, True), -]) -async def test_authentication(username, password, authorized, expected, - tmpcwd, app): +@pytest.mark.parametrize( + "username,password,authorized,expected", + [ + ("name", '123', False, False), + ("johnsnow", '123', True, False), + ("Snow", 'password', True, False), + ("johnsnow", 'password', False, False), + ("johnsnow", 'password', True, True), + ], +) +async def test_authentication(username, password, authorized, expected, tmpcwd, app): '''Test if authentication fails with a unexistent user''' auth = NativeAuthenticator(db=app.db) auth.create_user('johnsnow', 'password') if authorized: UserInfo.change_authorization(app.db, 'johnsnow') - response = await auth.authenticate(app, {'username': username, - 'password': password}) + response = await auth.authenticate( + app, {'username': username, 'password': password} + ) assert bool(response) == expected @@ -317,10 +328,13 @@ async def test_import_from_firstuse_delete_db_after(tmpcwd, app): assert ('passwords.dbm' not in files) and ('passwords.dbm.db' not in files) -@pytest.mark.parametrize("user,pwd", [ - ('user1', 'password'), - ('user 1', 'somethingelsereallysecure'), -]) +@pytest.mark.parametrize( + "user,pwd", + [ + ('user1', 'password'), + ('user 1', 'somethingelsereallysecure'), + ], +) async def test_import_from_firstuse_invalid_password(user, pwd, tmpcwd, app): with dbm.open('passwords.dbm', 'c', 0o600) as db: db[user] = pwd diff --git a/nativeauthenticator/tests/test_orm.py b/nativeauthenticator/tests/test_orm.py index fcb5aa0..d530f60 100644 --- a/nativeauthenticator/tests/test_orm.py +++ b/nativeauthenticator/tests/test_orm.py @@ -32,9 +32,11 @@ def test_validate_method_correct_email(tmpdir, app): def test_all_users(tmpdir, app): assert len(UserInfo.all_users(app.db)) == 1 - user = UserInfo(username='daenerystargaryen', - password=b'yesispeakvalyrian', - email='khaleesi@valyria.com') + user = UserInfo( + username='daenerystargaryen', + password=b'yesispeakvalyrian', + email='khaleesi@valyria.com', + ) app.db.add(user) app.db.commit() From 4096784e3a24158298b5c41e478a361aabd495c4 Mon Sep 17 00:00:00 2001 From: Erik Sundell Date: Sun, 31 Oct 2021 12:20:51 +0100 Subject: [PATCH 3/4] pre-commit: manual string formatting tweaks --- docs/quickstart.rst | 2 +- nativeauthenticator/crypto/encoding.py | 4 +--- nativeauthenticator/handlers.py | 16 +++++++--------- nativeauthenticator/nativeauthenticator.py | 8 ++++---- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/docs/quickstart.rst b/docs/quickstart.rst index c3b08ed..911bdab 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -41,7 +41,7 @@ Lastly, you need to add the following to the configuration file as well: .. code-block:: python import os, nativeauthenticator - c.JupyterHub.template_paths = ["{}/templates/".format(os.path.dirname(nativeauthenticator.__file__))] + c.JupyterHub.template_paths = [f"{os.path.dirname(nativeauthenticator.__file__}/templates/"] Now you can run JupyterHub using the updated configuration file and start using JupyterHub with NativeAuthenticator: diff --git a/nativeauthenticator/crypto/encoding.py b/nativeauthenticator/crypto/encoding.py index 28233be..34ad88d 100644 --- a/nativeauthenticator/crypto/encoding.py +++ b/nativeauthenticator/crypto/encoding.py @@ -11,9 +11,7 @@ def __init__(self, obj, *args): super().__init__(*args) def __str__(self): - return '{}. You passed in {!r} ({})'.format( - super().__str__(), self.obj, type(self.obj) - ) + return f'{super().__str__()}. You passed in {self.obj!r} ({type(self.obj)})' _PROTECTED_TYPES = ( diff --git a/nativeauthenticator/handlers.py b/nativeauthenticator/handlers.py index 683a3b6..e916c71 100644 --- a/nativeauthenticator/handlers.py +++ b/nativeauthenticator/handlers.py @@ -81,9 +81,9 @@ def get_result_message(self, user, taken, human=True): message = ( "Something went wrong. Be sure your username " "does not contain spaces or commas, your " - "password has at least {} characters and is " + f"password has at least {pw_len} characters and is " "not too common." - ).format(pw_len) + ) else: message = ( "Something went wrong. Be sure your username " @@ -277,9 +277,9 @@ async def post(self): pw_len = self.authenticator.minimum_password_length msg = ( 'Something went wrong! Be sure your new ' - 'password has at least {} characters and is ' + f'password has at least {pw_len} characters and is ' 'not too common.' - ).format(pw_len) + ) html = await self.render_template( 'change-password.html', user_name=user.name, result_message=msg, alert=alert @@ -307,17 +307,15 @@ async def post(self, user_name): if success: alert = 'alert-success' - msg = ('The password for {} has been changed ' 'successfully').format( - user_name - ) + msg = f'The password for {user_name} has been changed successfully' else: alert = 'alert-danger' pw_len = self.authenticator.minimum_password_length msg = ( 'Something went wrong! Be sure the new password ' - 'for {} has at least {} characters and is ' + f'for {user_name} has at least {pw_len} characters and is ' 'not too common.' - ).format(user_name, pw_len) + ) html = await self.render_template( 'change-password.html', user_name=user_name, result_message=msg, alert=alert diff --git a/nativeauthenticator/nativeauthenticator.py b/nativeauthenticator/nativeauthenticator.py index 2a40ac2..f9ba290 100644 --- a/nativeauthenticator/nativeauthenticator.py +++ b/nativeauthenticator/nativeauthenticator.py @@ -430,10 +430,10 @@ def add_data_from_firstuse(self): password = db[user].decode() new_user = self.create_user(user.decode(), password) if not new_user: - error = '''User {} was not created. Check password - restrictions or username problems before trying - again'''.format( - user + error = ( + f'User {user} was not created. Check password ' + 'restrictions or username problems before trying ' + 'again.' ) raise ValueError(error) From 45691153fb84effad6431d5d882b3b26dec07082 Mon Sep 17 00:00:00 2001 From: Erik Sundell Date: Sun, 31 Oct 2021 12:21:19 +0100 Subject: [PATCH 4/4] pre-commit: run black hook with string normalization --- dev-jupyterhub_config.py | 2 +- docs/conf.py | 50 ++--- nativeauthenticator/crypto/crypto.py | 8 +- nativeauthenticator/crypto/encoding.py | 26 +-- nativeauthenticator/crypto/signing.py | 50 ++--- nativeauthenticator/handlers.py | 112 +++++----- nativeauthenticator/nativeauthenticator.py | 68 +++--- nativeauthenticator/orm.py | 6 +- .../tests/test_authenticator.py | 200 +++++++++--------- nativeauthenticator/tests/test_orm.py | 18 +- setup.py | 16 +- 11 files changed, 278 insertions(+), 278 deletions(-) diff --git a/dev-jupyterhub_config.py b/dev-jupyterhub_config.py index a21d5dc..593974e 100644 --- a/dev-jupyterhub_config.py +++ b/dev-jupyterhub_config.py @@ -48,5 +48,5 @@ # } c.NativeAuthenticator.import_from_firstuse = False -c.NativeAuthenticator.firstuse_dbm_path = '/home/user/passwords.dbm' +c.NativeAuthenticator.firstuse_dbm_path = "/home/user/passwords.dbm" c.NativeAuthenticator.delete_firstuse_db_after_import = False diff --git a/docs/conf.py b/docs/conf.py index 2f7dddc..5ef301f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -14,14 +14,14 @@ # sys.path.insert(0, os.path.abspath('.')) # -- Project information ----------------------------------------------------- -project = 'Native Authenticator' -copyright = '2021, Leticia Portella' -author = 'Leticia Portella' +project = "Native Authenticator" +copyright = "2021, Leticia Portella" +author = "Leticia Portella" # The short X.Y version -version = '' +version = "" # The full version, including alpha/beta/rc tags -release = '' +release = "" # -- General configuration --------------------------------------------------- @@ -36,16 +36,16 @@ extensions = [] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # The suffix(es) of source filenames. # You can specify multiple suffix as a list of string: # # source_suffix = ['.rst', '.md'] -source_suffix = '.rst' +source_suffix = ".rst" # The master toctree document. -master_doc = 'index' +master_doc = "index" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -57,10 +57,10 @@ # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This pattern also affects html_static_path and html_extra_path. -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # -- Options for HTML output ------------------------------------------------- @@ -68,7 +68,7 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'alabaster' +html_theme = "alabaster" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -79,7 +79,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] # Custom sidebar templates, must be a dictionary that maps document names # to template names. @@ -95,7 +95,7 @@ # -- Options for HTMLHelp output --------------------------------------------- # Output file base name for HTML help builder. -htmlhelp_basename = 'NativeAuthenticatordoc' +htmlhelp_basename = "NativeAuthenticatordoc" # -- Options for LaTeX output ------------------------------------------------ @@ -121,10 +121,10 @@ latex_documents = [ ( master_doc, - 'NativeAuthenticator.tex', - 'Native Authenticator Documentation', - 'Leticia Portella', - 'manual', + "NativeAuthenticator.tex", + "Native Authenticator Documentation", + "Leticia Portella", + "manual", ), ] @@ -136,8 +136,8 @@ man_pages = [ ( master_doc, - 'nativeauthenticator', - 'Native Authenticator Documentation', + "nativeauthenticator", + "Native Authenticator Documentation", [author], 1, ) @@ -152,12 +152,12 @@ texinfo_documents = [ ( master_doc, - 'NativeAuthenticator', - 'Native Authenticator Documentation', + "NativeAuthenticator", + "Native Authenticator Documentation", author, - 'NativeAuthenticator', - 'One line description of project.', - 'Miscellaneous', + "NativeAuthenticator", + "One line description of project.", + "Miscellaneous", ), ] @@ -177,4 +177,4 @@ # epub_uid = '' # A list of files that should not be packed into the epub file. -epub_exclude_files = ['search.html'] +epub_exclude_files = ["search.html"] diff --git a/nativeauthenticator/crypto/crypto.py b/nativeauthenticator/crypto/crypto.py index 0858c00..d0cddb7 100644 --- a/nativeauthenticator/crypto/crypto.py +++ b/nativeauthenticator/crypto/crypto.py @@ -14,7 +14,7 @@ class InvalidAlgorithm(ValueError): pass -def salted_hmac(key_salt, value, secret, *, algorithm='sha1'): +def salted_hmac(key_salt, value, secret, *, algorithm="sha1"): """ Return the HMAC of 'value', using a key generated from key_salt and a secret. Default algorithm is SHA1, @@ -29,7 +29,7 @@ def salted_hmac(key_salt, value, secret, *, algorithm='sha1'): hasher = getattr(hashlib, algorithm) except AttributeError as e: raise InvalidAlgorithm( - '%r is not an algorithm accepted by the hashlib module.' % algorithm + "%r is not an algorithm accepted by the hashlib module." % algorithm ) from e # We need to generate a derived key from our base key. We can do this by # passing the key_salt and our base key through a pseudo-random function. @@ -41,7 +41,7 @@ def salted_hmac(key_salt, value, secret, *, algorithm='sha1'): return hmac.new(key, msg=force_bytes(value), digestmod=hasher) -RANDOM_STRING_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' +RANDOM_STRING_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" def get_random_string(length, allowed_chars=RANDOM_STRING_CHARS): @@ -55,7 +55,7 @@ def get_random_string(length, allowed_chars=RANDOM_STRING_CHARS): * length: 12, bit length =~ 71 bits * length: 22, bit length =~ 131 bits """ - return ''.join(secrets.choice(allowed_chars) for i in range(length)) + return "".join(secrets.choice(allowed_chars) for i in range(length)) def constant_time_compare(val1, val2): diff --git a/nativeauthenticator/crypto/encoding.py b/nativeauthenticator/crypto/encoding.py index 34ad88d..10d085e 100644 --- a/nativeauthenticator/crypto/encoding.py +++ b/nativeauthenticator/crypto/encoding.py @@ -11,7 +11,7 @@ def __init__(self, obj, *args): super().__init__(*args) def __str__(self): - return f'{super().__str__()}. You passed in {self.obj!r} ({type(self.obj)})' + return f"{super().__str__()}. You passed in {self.obj!r} ({type(self.obj)})" _PROTECTED_TYPES = ( @@ -34,7 +34,7 @@ def is_protected_type(obj): return isinstance(obj, _PROTECTED_TYPES) -def force_str(s, encoding='utf-8', strings_only=False, errors='strict'): +def force_str(s, encoding="utf-8", strings_only=False, errors="strict"): """ Similar to smart_str(), except that lazy instances are resolved to strings, rather than kept as lazy objects. @@ -56,7 +56,7 @@ def force_str(s, encoding='utf-8', strings_only=False, errors='strict'): return s -def force_bytes(s, encoding='utf-8', strings_only=False, errors='strict'): +def force_bytes(s, encoding="utf-8", strings_only=False, errors="strict"): """ Similar to smart_bytes, except that lazy instances are resolved to strings, rather than kept as lazy objects. @@ -65,10 +65,10 @@ def force_bytes(s, encoding='utf-8', strings_only=False, errors='strict'): """ # Handle the common case first for performance reasons. if isinstance(s, bytes): - if encoding == 'utf-8': + if encoding == "utf-8": return s else: - return s.decode('utf-8', errors).encode(encoding, errors) + return s.decode("utf-8", errors).encode(encoding, errors) if strings_only and is_protected_type(s): return s if isinstance(s, memoryview): @@ -83,11 +83,11 @@ def force_bytes(s, encoding='utf-8', strings_only=False, errors='strict'): (fmt % char).encode(): bytes((char,)) for ascii_range in _ascii_ranges for char in ascii_range - for fmt in ['%02x', '%02X'] + for fmt in ["%02x", "%02X"] } # And then everything above 128, because bytes ≥ 128 are part of multibyte # Unicode characters. -_hexdig = '0123456789ABCDEFabcdef' +_hexdig = "0123456789ABCDEFabcdef" _hextobyte.update( {(a + b).encode(): bytes.fromhex(a + b) for a in _hexdig[8:] for b in _hexdig} ) @@ -110,7 +110,7 @@ def uri_to_iri(uri): # second block, decode the first 2 bytes if they represent a hex code to # decode. The rest of the block is the part after '%AB', not containing # any '%'. Add that to the output without further processing. - bits = uri.split(b'%') + bits = uri.split(b"%") if len(bits) == 1: iri = uri else: @@ -123,9 +123,9 @@ def uri_to_iri(uri): append(hextobyte[item[:2]]) append(item[2:]) else: - append(b'%') + append(b"%") append(item) - iri = b''.join(parts) + iri = b"".join(parts) return repercent_broken_unicode(iri).decode() @@ -148,7 +148,7 @@ def escape_uri_path(path): def punycode(domain): """Return the Punycode of the given domain if it's non-ASCII.""" - return domain.encode('idna').decode('ascii') + return domain.encode("idna").decode("ascii") def repercent_broken_unicode(path): @@ -191,10 +191,10 @@ def get_system_encoding(): #10335 and #5846. """ try: - encoding = locale.getdefaultlocale()[1] or 'ascii' + encoding = locale.getdefaultlocale()[1] or "ascii" codecs.lookup(encoding) except Exception: - encoding = 'ascii' + encoding = "ascii" return encoding diff --git a/nativeauthenticator/crypto/signing.py b/nativeauthenticator/crypto/signing.py index 74bc1d5..0f2f994 100644 --- a/nativeauthenticator/crypto/signing.py +++ b/nativeauthenticator/crypto/signing.py @@ -42,8 +42,8 @@ from .crypto import constant_time_compare from .crypto import salted_hmac -_SEP_UNSAFE = re.compile(r'^[A-z0-9-_=]*$') -BASE62_ALPHABET = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' +_SEP_UNSAFE = re.compile(r"^[A-z0-9-_=]*$") +BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" class BadSignature(Exception): @@ -60,10 +60,10 @@ class SignatureExpired(BadSignature): def b62_encode(s): if s == 0: - return '0' - sign = '-' if s < 0 else '' + return "0" + sign = "-" if s < 0 else "" s = abs(s) - encoded = '' + encoded = "" while s > 0: s, remainder = divmod(s, 62) encoded = BASE62_ALPHABET[remainder] + encoded @@ -71,10 +71,10 @@ def b62_encode(s): def b62_decode(s): - if s == '0': + if s == "0": return 0 sign = 1 - if s[0] == '-': + if s[0] == "-": s = s[1:] sign = -1 decoded = 0 @@ -84,15 +84,15 @@ def b62_decode(s): def b64_encode(s): - return base64.urlsafe_b64encode(s).strip(b'=') + return base64.urlsafe_b64encode(s).strip(b"=") def b64_decode(s): - pad = b'=' * (-len(s) % 4) + pad = b"=" * (-len(s) % 4) return base64.urlsafe_b64decode(s + pad) -def base64_hmac(salt, value, key, algorithm='sha1'): +def base64_hmac(salt, value, key, algorithm="sha1"): return b64_encode( salted_hmac(salt, value, key, algorithm=algorithm).digest() ).decode() @@ -105,14 +105,14 @@ class JSONSerializer: """ def dumps(self, obj): - return json.dumps(obj, separators=(',', ':')).encode('latin-1') + return json.dumps(obj, separators=(",", ":")).encode("latin-1") def loads(self, data): - return json.loads(data.decode('latin-1')) + return json.loads(data.decode("latin-1")) def dumps( - obj, key=None, salt='django.core.signing', serializer=JSONSerializer, compress=False + obj, key=None, salt="django.core.signing", serializer=JSONSerializer, compress=False ): """ Return URL-safe, hmac signed base64 compressed JSON string. If key is @@ -136,7 +136,7 @@ def dumps( def loads( - s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None + s, key=None, salt="django.core.signing", serializer=JSONSerializer, max_age=None ): """ Reverse of dumps(), raise BadSignature if signature fails. @@ -149,26 +149,26 @@ def loads( class Signer: - def __init__(self, key, sep=':', salt=None, algorithm=None): + def __init__(self, key, sep=":", salt=None, algorithm=None): self.key = key self.sep = sep if _SEP_UNSAFE.match(self.sep): raise ValueError( - 'Unsafe Signer separator: %r (cannot be empty or consist of ' - 'only A-z0-9-_=)' % sep, + "Unsafe Signer separator: %r (cannot be empty or consist of " + "only A-z0-9-_=)" % sep, ) - self.salt = salt or '{}.{}'.format( + self.salt = salt or "{}.{}".format( self.__class__.__module__, self.__class__.__name__ ) - self.algorithm = algorithm or 'sha256' + self.algorithm = algorithm or "sha256" def signature(self, value): return base64_hmac( - self.salt + 'signer', value, self.key, algorithm=self.algorithm + self.salt + "signer", value, self.key, algorithm=self.algorithm ) def sign(self, value): - return f'{value}{self.sep}{self.signature(value)}' + return f"{value}{self.sep}{self.signature(value)}" def unsign(self, signed_value): if self.sep not in signed_value: @@ -200,14 +200,14 @@ def sign_object(self, obj, serializer=JSONSerializer, compress=False): is_compressed = True base64d = b64_encode(data).decode() if is_compressed: - base64d = '.' + base64d + base64d = "." + base64d return self.sign(base64d) def unsign_object(self, signed_obj, serializer=JSONSerializer, **kwargs): # Signer.unsign() returns str but base64 and zlib compression operate # on bytes. base64d = self.unsign(signed_obj, **kwargs).encode() - decompress = base64d[:1] == b'.' + decompress = base64d[:1] == b"." if decompress: # It's compressed; uncompress it first. base64d = base64d[1:] @@ -222,7 +222,7 @@ def timestamp(self): return b62_encode(int(time.time())) def sign(self, value): - value = f'{value}{self.sep}{self.timestamp()}' + value = f"{value}{self.sep}{self.timestamp()}" return super().sign(value) def unsign(self, value, max_age=None): @@ -239,5 +239,5 @@ def unsign(self, value, max_age=None): # Check timestamp is not older than max_age age = time.time() - timestamp if age > max_age: - raise SignatureExpired(f'Signature age {age} > {max_age} seconds') + raise SignatureExpired(f"Signature age {age} > {max_age} seconds") return value diff --git a/nativeauthenticator/handlers.py b/nativeauthenticator/handlers.py index e916c71..4f584ce 100644 --- a/nativeauthenticator/handlers.py +++ b/nativeauthenticator/handlers.py @@ -25,7 +25,7 @@ from .orm import UserInfo -TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), 'templates') +TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "templates") class LocalBase(BaseHandler): @@ -34,9 +34,9 @@ class LocalBase(BaseHandler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not LocalBase._template_dir_registered: - self.log.debug('Adding %s to template path', TEMPLATE_DIR) + self.log.debug("Adding %s to template path", TEMPLATE_DIR) loader = FileSystemLoader([TEMPLATE_DIR]) - env = self.settings['jinja2_env'] + env = self.settings["jinja2_env"] previous_loader = env.loader env.loader = ChoiceLoader([previous_loader, loader]) LocalBase._template_dir_registered = True @@ -50,7 +50,7 @@ async def get(self): raise web.HTTPError(404) html = await self.render_template( - 'signup.html', + "signup.html", ask_email=self.authenticator.ask_email_on_signup, two_factor_auth=self.authenticator.allow_2fa, recaptcha_key=self.authenticator.recaptcha_key, @@ -59,14 +59,14 @@ async def get(self): self.finish(html) def get_result_message(self, user, taken, human=True): - alert = 'alert-info' - message = 'Your information has been sent to the admin' + alert = "alert-info" + message = "Your information has been sent to the admin" if user and user.login_email_sent: - message = 'Check your email to authorize your access' + message = "Check your email to authorize your access" # Always error if username is taken. if taken: - alert = 'alert-danger' + alert = "alert-danger" message = ( "Something went wrong. It appears that this " "username is already in use. Please try again " @@ -75,7 +75,7 @@ def get_result_message(self, user, taken, human=True): else: # Error if user creation was not successful. if not user: - alert = 'alert-danger' + alert = "alert-danger" pw_len = self.authenticator.minimum_password_length if pw_len: message = ( @@ -93,14 +93,14 @@ def get_result_message(self, user, taken, human=True): # If user creation went through & open-signup is enabled, success. elif self.authenticator.open_signup: - alert = 'alert-success' + alert = "alert-success" message = ( - 'The signup was successful. You can now go to ' - 'home page and log in the system' + "The signup was successful. You can now go to " + "home page and log in the system" ) if not human: - alert = 'alert-danger' + alert = "alert-danger" message = "You failed the reCAPTCHA. Please try again" return alert, message @@ -114,14 +114,14 @@ async def post(self): if self.authenticator.recaptcha_key: recaptcha_response = self.get_body_argument( - 'g-recaptcha-response', strip=True + "g-recaptcha-response", strip=True ) if recaptcha_response == "": assume_human = False else: data = { - 'secret': self.authenticator.recaptcha_secret, - 'response': recaptcha_response, + "secret": self.authenticator.recaptcha_secret, + "response": recaptcha_response, } validation_status = requests.post(url, data=data) assume_human = validation_status.json().get("success") @@ -132,12 +132,12 @@ async def post(self): if assume_human: user_info = { - 'username': self.get_body_argument('username', strip=False), - 'pw': self.get_body_argument('pw', strip=False), - 'email': self.get_body_argument('email', '', strip=False), - 'has_2fa': bool(self.get_body_argument('2fa', '', strip=False)), + "username": self.get_body_argument("username", strip=False), + "pw": self.get_body_argument("pw", strip=False), + "email": self.get_body_argument("email", "", strip=False), + "has_2fa": bool(self.get_body_argument("2fa", "", strip=False)), } - taken = self.authenticator.user_exists(user_info['username']) + taken = self.authenticator.user_exists(user_info["username"]) user = self.authenticator.create_user(**user_info) else: user = False @@ -145,13 +145,13 @@ async def post(self): alert, message = self.get_result_message(user, taken, assume_human) - otp_secret, user_2fa = '', '' + otp_secret, user_2fa = "", "" if user: otp_secret = user.otp_secret user_2fa = user.has_2fa html = await self.render_template( - 'signup.html', + "signup.html", ask_email=self.authenticator.ask_email_on_signup, result_message=message, alert=alert, @@ -170,7 +170,7 @@ class AuthorizationHandler(LocalBase): @admin_users_scope async def get(self): html = await self.render_template( - 'authorization-area.html', + "authorization-area.html", ask_email=self.authenticator.ask_email_on_signup, users=self.db.query(UserInfo).all(), ) @@ -181,7 +181,7 @@ class ChangeAuthorizationHandler(LocalBase): @admin_users_scope async def get(self, slug): UserInfo.change_authorization(self.db, slug) - self.redirect(self.hub.base_url + 'authorize#' + slug) + self.redirect(self.hub.base_url + "authorize#" + slug) class AuthorizeHandler(LocalBase): @@ -208,7 +208,7 @@ async def get(self, slug): # add POSIX user!! html = await self.render_template( - 'my_message.html', + "my_message.html", message=msg, ) self.finish(html) @@ -258,7 +258,7 @@ class ChangePasswordHandler(LocalBase): async def get(self): user = await self.get_current_user() html = await self.render_template( - 'change-password.html', + "change-password.html", user_name=user.name, ) self.finish(html) @@ -266,23 +266,23 @@ async def get(self): @web.authenticated async def post(self): user = await self.get_current_user() - new_password = self.get_body_argument('password', strip=False) + new_password = self.get_body_argument("password", strip=False) success = self.authenticator.change_password(user.name, new_password) if success: - alert = 'alert-success' - msg = 'Your password has been changed successfully!' + alert = "alert-success" + msg = "Your password has been changed successfully!" else: - alert = 'alert-danger' + alert = "alert-danger" pw_len = self.authenticator.minimum_password_length msg = ( - 'Something went wrong! Be sure your new ' - f'password has at least {pw_len} characters and is ' - 'not too common.' + "Something went wrong! Be sure your new " + f"password has at least {pw_len} characters and is " + "not too common." ) html = await self.render_template( - 'change-password.html', user_name=user.name, result_message=msg, alert=alert + "change-password.html", user_name=user.name, result_message=msg, alert=alert ) self.finish(html) @@ -295,30 +295,30 @@ async def get(self, user_name): if not self.authenticator.user_exists(user_name): raise web.HTTPError(404) html = await self.render_template( - 'change-password.html', + "change-password.html", user_name=user_name, ) self.finish(html) @admin_users_scope async def post(self, user_name): - new_password = self.get_body_argument('password', strip=False) + new_password = self.get_body_argument("password", strip=False) success = self.authenticator.change_password(user_name, new_password) if success: - alert = 'alert-success' - msg = f'The password for {user_name} has been changed successfully' + alert = "alert-success" + msg = f"The password for {user_name} has been changed successfully" else: - alert = 'alert-danger' + alert = "alert-danger" pw_len = self.authenticator.minimum_password_length msg = ( - 'Something went wrong! Be sure the new password ' - f'for {user_name} has at least {pw_len} characters and is ' - 'not too common.' + "Something went wrong! Be sure the new password " + f"for {user_name} has at least {pw_len} characters and is " + "not too common." ) html = await self.render_template( - 'change-password.html', user_name=user_name, result_message=msg, alert=alert + "change-password.html", user_name=user_name, result_message=msg, alert=alert ) self.finish(html) @@ -326,17 +326,17 @@ async def post(self, user_name): class LoginHandler(LoginHandler, LocalBase): def _render(self, login_error=None, username=None): return self.render_template( - 'native-login.html', - next=url_escape(self.get_argument('next', default='')), + "native-login.html", + next=url_escape(self.get_argument("next", default="")), username=username, login_error=login_error, custom_html=self.authenticator.custom_html, - login_url=self.settings['login_url'], + login_url=self.settings["login_url"], enable_signup=self.authenticator.enable_signup, two_factor_auth=self.authenticator.allow_2fa, authenticator_login_url=url_concat( self.authenticator.login_url(self.hub.base_url), - {'next': self.get_argument('next', '')}, + {"next": self.get_argument("next", "")}, ), ) @@ -346,7 +346,7 @@ async def post(self): for arg in self.request.arguments: data[arg] = self.get_argument(arg, strip=False) - auth_timer = self.statsd.timer('login.authenticate').start() + auth_timer = self.statsd.timer("login.authenticate").start() user = await self.login_user(data) auth_timer.stop(send=False) @@ -357,19 +357,19 @@ async def post(self): self.redirect(self.get_next_url(user)) else: # default error mesage on unsuccessful login - error = 'Invalid username or password' + error = "Invalid username or password" # check is user exists and has correct password, # and is just not authorised - nuser = self.authenticator.get_user(data['username']) + nuser = self.authenticator.get_user(data["username"]) if nuser is not None: if ( - nuser.is_valid_password(data['password']) + nuser.is_valid_password(data["password"]) and not nuser.is_authorized ): - error = 'User has not been authorized by administrator yet' + error = "User has not been authorized by administrator yet" - html = await self._render(login_error=error, username=data['username']) + html = await self._render(login_error=error, username=data["username"]) self.finish(html) @@ -382,11 +382,11 @@ async def get(self, user_name): if user is not None: if not user.is_authorized: # Delete user from NativeAuthenticator db table (users_info) - user = type('User', (), {'name': user_name}) + user = type("User", (), {"name": user_name}) self.authenticator.delete_user(user) # Also delete user from jupyterhub registry, if present if self.users.get(user_name) is not None: self.users.delete(user_name) - self.redirect(self.hub.base_url + 'authorize') + self.redirect(self.hub.base_url + "authorize") diff --git a/nativeauthenticator/nativeauthenticator.py b/nativeauthenticator/nativeauthenticator.py index f9ba290..3ba99f7 100644 --- a/nativeauthenticator/nativeauthenticator.py +++ b/nativeauthenticator/nativeauthenticator.py @@ -156,7 +156,7 @@ class NativeAuthenticator(Authenticator): ) firstuse_db_path = Unicode( - 'passwords.dbm', + "passwords.dbm", config=True, help=""" Path to store the db file of FirstUse with username / pwd hash in @@ -196,22 +196,22 @@ def setup_self_approval(self): def add_new_table(self): inspector = inspect(self.db.bind) - if 'users_info' not in inspector.get_table_names(): + if "users_info" not in inspector.get_table_names(): UserInfo.__table__.create(self.db.bind) def add_login_attempt(self, username): if not self.login_attempts.get(username): - self.login_attempts[username] = {'count': 1, 'time': datetime.now()} + self.login_attempts[username] = {"count": 1, "time": datetime.now()} else: - self.login_attempts[username]['count'] += 1 - self.login_attempts[username]['time'] = datetime.now() + self.login_attempts[username]["count"] += 1 + self.login_attempts[username]["time"] = datetime.now() def can_try_to_login_again(self, username): login_attempts = self.login_attempts.get(username) if not login_attempts: return True - time_last_attempt = datetime.now() - login_attempts['time'] + time_last_attempt = datetime.now() - login_attempts["time"] if time_last_attempt.seconds > self.seconds_before_next_try: return True @@ -220,7 +220,7 @@ def can_try_to_login_again(self, username): def is_blocked(self, username): logins = self.login_attempts.get(username) - if not logins or logins['count'] < self.allowed_failed_logins: + if not logins or logins["count"] < self.allowed_failed_logins: return False if self.can_try_to_login_again(username): @@ -233,8 +233,8 @@ def successful_login(self, username): @gen.coroutine def authenticate(self, handler, data): - username = self.normalize_username(data['username']) - password = data['password'] + username = self.normalize_username(data["username"]) + password = data["password"] user = self.get_user(username) if not user: @@ -246,7 +246,7 @@ def authenticate(self, handler, data): validations = [user.is_authorized, user.is_valid_password(password)] if user.has_2fa: - validations.append(user.is_valid_token(data.get('2fa'))) + validations.append(user.is_valid_token(data.get("2fa"))) if all(validations): self.successful_login(username) @@ -256,7 +256,7 @@ def authenticate(self, handler, data): def is_password_common(self, password): common_credentials_file = os.path.join( - os.path.dirname(os.path.abspath(__file__)), 'common-credentials.txt' + os.path.dirname(os.path.abspath(__file__)), "common-credentials.txt" ) if not self.COMMON_PASSWORDS: with open(common_credentials_file) as f: @@ -310,11 +310,11 @@ def create_user(self, username, pw, **kwargs): return encoded_pw = bcrypt.hashpw(pw.encode(), bcrypt.gensalt()) - infos = {'username': username, 'password': encoded_pw} + infos = {"username": username, "password": encoded_pw} infos.update(kwargs) if self.open_signup or username in self.get_authed_users(): - infos.update({'is_authorized': True}) + infos.update({"is_authorized": True}) try: user_info = UserInfo(**infos) @@ -341,18 +341,18 @@ def generate_approval_url(self, username, when=None): def send_approval_email(self, dest, url): msg = EmailMessage() - msg['From'] = self.self_approval_email[0] - msg['Subject'] = self.self_approval_email[1] + msg["From"] = self.self_approval_email[0] + msg["Subject"] = self.self_approval_email[1] msg.set_content(self.self_approval_email[2].format(approval_url=url)) - msg['To'] = dest + msg["To"] = dest try: if self.self_approval_server: - s = smtplib.SMTP_SSL(self.self_approval_server['url']) + s = smtplib.SMTP_SSL(self.self_approval_server["url"]) s.login( - self.self_approval_server['usr'], self.self_approval_server['pwd'] + self.self_approval_server["usr"], self.self_approval_server["pwd"] ) else: - s = smtplib.SMTP('localhost') + s = smtplib.SMTP("localhost") s.send_message(msg) s.quit() except Exception as e: @@ -386,22 +386,22 @@ def change_password(self, username, new_password): return True def validate_username(self, username): - invalid_chars = [',', ' '] + invalid_chars = [",", " "] if any((char in username) for char in invalid_chars): return False return super().validate_username(username) def get_handlers(self, app): native_handlers = [ - (r'/login', LoginHandler), - (r'/signup', SignUpHandler), - (r'/discard/([^/]*)', DiscardHandler), - (r'/authorize', AuthorizationHandler), - (r'/authorize/([^/]*)', ChangeAuthorizationHandler), + (r"/login", LoginHandler), + (r"/signup", SignUpHandler), + (r"/discard/([^/]*)", DiscardHandler), + (r"/authorize", AuthorizationHandler), + (r"/authorize/([^/]*)", ChangeAuthorizationHandler), # the following /confirm/ must be like in generate_approval_url() - (r'/confirm/([^/]*)', AuthorizeHandler), - (r'/change-password', ChangePasswordHandler), - (r'/change-password/([^/]+)', ChangePasswordAdminHandler), + (r"/confirm/([^/]*)", AuthorizeHandler), + (r"/change-password", ChangePasswordHandler), + (r"/change-password/([^/]+)", ChangePasswordAdminHandler), ] return native_handlers @@ -419,21 +419,21 @@ def delete_dbm_db(self): db_complete_path = str(db_path.absolute()) # necessary for BSD implementation of dbm lib - if os.path.exists(os.path.join(db_dir, db_name + '.db')): - os.remove(db_complete_path + '.db') + if os.path.exists(os.path.join(db_dir, db_name + ".db")): + os.remove(db_complete_path + ".db") else: os.remove(db_complete_path) def add_data_from_firstuse(self): - with dbm.open(self.firstuse_db_path, 'c', 0o600) as db: + with dbm.open(self.firstuse_db_path, "c", 0o600) as db: for user in db.keys(): password = db[user].decode() new_user = self.create_user(user.decode(), password) if not new_user: error = ( - f'User {user} was not created. Check password ' - 'restrictions or username problems before trying ' - 'again.' + f"User {user} was not created. Check password " + "restrictions or username problems before trying " + "again." ) raise ValueError(error) diff --git a/nativeauthenticator/orm.py b/nativeauthenticator/orm.py index dd45ce7..77b34b2 100644 --- a/nativeauthenticator/orm.py +++ b/nativeauthenticator/orm.py @@ -14,7 +14,7 @@ class UserInfo(Base): - __tablename__ = 'users_info' + __tablename__ = "users_info" id = Column(Integer, primary_key=True, autoincrement=True) username = Column(String(128), nullable=False) password = Column(LargeBinary, nullable=False) @@ -27,7 +27,7 @@ class UserInfo(Base): def __init__(self, **kwargs): super().__init__(**kwargs) if not self.otp_secret: - self.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8') + self.otp_secret = base64.b32encode(os.urandom(10)).decode("utf-8") @classmethod def find(cls, db, username): @@ -53,7 +53,7 @@ def change_authorization(cls, db, username): db.commit() return user - @validates('email') + @validates("email") def validate_email(self, key, address): if not address: return diff --git a/nativeauthenticator/tests/test_authenticator.py b/nativeauthenticator/tests/test_authenticator.py index dc81256..87de477 100644 --- a/nativeauthenticator/tests/test_authenticator.py +++ b/nativeauthenticator/tests/test_authenticator.py @@ -40,82 +40,82 @@ def app(): ], ) async def test_create_user(is_admin, open_signup, expected_authorization, tmpcwd, app): - '''Test method create_user for new user and authorization''' + """Test method create_user for new user and authorization""" auth = NativeAuthenticator(db=app.db) if is_admin: - auth.admin_users = {'johnsnow'} + auth.admin_users = {"johnsnow"} if open_signup: auth.open_signup = True - auth.create_user('johnsnow', 'password') - user_info = UserInfo.find(app.db, 'johnsnow') - assert user_info.username == 'johnsnow' + auth.create_user("johnsnow", "password") + user_info = UserInfo.find(app.db, "johnsnow") + assert user_info.username == "johnsnow" assert user_info.is_authorized == expected_authorization async def test_create_user_bad_characters(tmpcwd, app): - '''Test method create_user with bad characters on username''' + """Test method create_user with bad characters on username""" auth = NativeAuthenticator(db=app.db) - assert not auth.create_user('john snow', 'password') - assert not auth.create_user('john,snow', 'password') + assert not auth.create_user("john snow", "password") + assert not auth.create_user("john,snow", "password") async def test_create_user_twice(tmpcwd, app): - '''Test if creating users with an existing handle errors.''' + """Test if creating users with an existing handle errors.""" auth = NativeAuthenticator(db=app.db) # First creation should succeed. - assert auth.create_user('johnsnow', 'password') + assert auth.create_user("johnsnow", "password") # Creating the same account again should fail. - assert not auth.create_user('johnsnow', 'password') + assert not auth.create_user("johnsnow", "password") # Creating a user with same handle but different pw should also fail. - assert not auth.create_user('johnsnow', 'adifferentpassword') + assert not auth.create_user("johnsnow", "adifferentpassword") async def test_get_authed_users(tmpcwd, app): - '''Test if get_authed_users returns the proper set of users.''' + """Test if get_authed_users returns the proper set of users.""" auth = NativeAuthenticator(db=app.db) auth.admin_users = set() assert auth.get_authed_users() == set() - auth.create_user('johnsnow', 'password') + auth.create_user("johnsnow", "password") assert auth.get_authed_users() == set() - UserInfo.change_authorization(app.db, 'johnsnow') - assert auth.get_authed_users() == set({'johnsnow'}) + UserInfo.change_authorization(app.db, "johnsnow") + assert auth.get_authed_users() == set({"johnsnow"}) - auth.create_user('daenerystargaryen', 'anotherpassword') - assert auth.get_authed_users() == set({'johnsnow'}) + auth.create_user("daenerystargaryen", "anotherpassword") + assert auth.get_authed_users() == set({"johnsnow"}) - auth.admin_users = set({'daenerystargaryen'}) - assert 'johnsnow' in auth.get_authed_users() - assert 'daenerystargaryen' in auth.get_authed_users() + auth.admin_users = set({"daenerystargaryen"}) + assert "johnsnow" in auth.get_authed_users() + assert "daenerystargaryen" in auth.get_authed_users() async def test_get_unauthed_amount(tmpcwd, app): - '''Test if get_unauthed_amount returns the proper amount.''' + """Test if get_unauthed_amount returns the proper amount.""" auth = NativeAuthenticator(db=app.db) auth.admin_users = set() assert auth.get_unauthed_amount() == 0 - auth.create_user('johnsnow', 'password') + auth.create_user("johnsnow", "password") assert auth.get_unauthed_amount() == 1 - UserInfo.change_authorization(app.db, 'johnsnow') + UserInfo.change_authorization(app.db, "johnsnow") assert auth.get_unauthed_amount() == 0 - auth.create_user('daenerystargaryen', 'anotherpassword') + auth.create_user("daenerystargaryen", "anotherpassword") assert auth.get_unauthed_amount() == 1 - auth.create_user('tyrionlannister', 'yetanotherpassword') + auth.create_user("tyrionlannister", "yetanotherpassword") assert auth.get_unauthed_amount() == 2 - auth.admin_users = set({'daenerystargaryen'}) + auth.admin_users = set({"daenerystargaryen"}) assert auth.get_unauthed_amount() == 1 @@ -131,47 +131,47 @@ async def test_get_unauthed_amount(tmpcwd, app): async def test_create_user_with_strong_passwords( password, min_len, expected, tmpcwd, app ): - '''Test if method create_user and strong passwords mesh''' + """Test if method create_user and strong passwords mesh""" auth = NativeAuthenticator(db=app.db) auth.check_common_password = True auth.minimum_password_length = min_len - user = auth.create_user('johnsnow', password) + user = auth.create_user("johnsnow", password) assert bool(user) == expected async def test_change_password(tmpcwd, app): auth = NativeAuthenticator(db=app.db) - user = auth.create_user('johnsnow', 'password') - assert user.is_valid_password('password') - auth.change_password('johnsnow', 'newpassword') - assert not user.is_valid_password('password') - assert user.is_valid_password('newpassword') + user = auth.create_user("johnsnow", "password") + assert user.is_valid_password("password") + auth.change_password("johnsnow", "newpassword") + assert not user.is_valid_password("password") + assert user.is_valid_password("newpassword") async def test_no_change_to_bad_password(tmpcwd, app): - '''Test that changing password doesn't bypass password requirements''' + """Test that changing password doesn't bypass password requirements""" auth = NativeAuthenticator(db=app.db) auth.check_common_password = True auth.minimum_password_length = 8 - auth.create_user('johnsnow', 'ironwood') + auth.create_user("johnsnow", "ironwood") # Can't change password of nonexistent users. - assert auth.change_password('samwelltarly', 'palanquin') is None - assert auth.get_user('johnsnow').is_valid_password('ironwood') + assert auth.change_password("samwelltarly", "palanquin") is None + assert auth.get_user("johnsnow").is_valid_password("ironwood") # Can't change password to something too short. - assert auth.change_password('johnsnow', 'mummer') is None - assert auth.get_user('johnsnow').is_valid_password('ironwood') + assert auth.change_password("johnsnow", "mummer") is None + assert auth.get_user("johnsnow").is_valid_password("ironwood") # Can't change password to something too common. - assert auth.change_password('johnsnow', 'dragon') is None - assert auth.get_user('johnsnow').is_valid_password('ironwood') + assert auth.change_password("johnsnow", "dragon") is None + assert auth.get_user("johnsnow").is_valid_password("ironwood") # CAN change password to something fulfilling criteria. - assert auth.change_password('johnsnow', 'DaenerysTargaryen') is not None - assert not auth.get_user('johnsnow').is_valid_password('ironwood') - assert auth.get_user('johnsnow').is_valid_password('DaenerysTargaryen') + assert auth.change_password("johnsnow", "DaenerysTargaryen") is not None + assert not auth.get_user("johnsnow").is_valid_password("ironwood") + assert auth.get_user("johnsnow").is_valid_password("DaenerysTargaryen") @pytest.mark.parametrize( @@ -182,14 +182,14 @@ async def test_no_change_to_bad_password(tmpcwd, app): ], ) async def test_create_user_disable(enable_signup, expected_success, tmpcwd, app): - '''Test method get_or_create_user not create user if signup is disabled''' + """Test method get_or_create_user not create user if signup is disabled""" auth = NativeAuthenticator(db=app.db) auth.enable_signup = enable_signup - user = auth.create_user('johnsnow', 'password') + user = auth.create_user("johnsnow", "password") if expected_success: - assert user.username == 'johnsnow' + assert user.username == "johnsnow" else: assert not user @@ -197,66 +197,66 @@ async def test_create_user_disable(enable_signup, expected_success, tmpcwd, app) @pytest.mark.parametrize( "username,password,authorized,expected", [ - ("name", '123', False, False), - ("johnsnow", '123', True, False), - ("Snow", 'password', True, False), - ("johnsnow", 'password', False, False), - ("johnsnow", 'password', True, True), + ("name", "123", False, False), + ("johnsnow", "123", True, False), + ("Snow", "password", True, False), + ("johnsnow", "password", False, False), + ("johnsnow", "password", True, True), ], ) async def test_authentication(username, password, authorized, expected, tmpcwd, app): - '''Test if authentication fails with a unexistent user''' + """Test if authentication fails with a unexistent user""" auth = NativeAuthenticator(db=app.db) - auth.create_user('johnsnow', 'password') + auth.create_user("johnsnow", "password") if authorized: - UserInfo.change_authorization(app.db, 'johnsnow') + UserInfo.change_authorization(app.db, "johnsnow") response = await auth.authenticate( - app, {'username': username, 'password': password} + app, {"username": username, "password": password} ) assert bool(response) == expected async def test_handlers(app): - '''Test if all handlers are available on the Authenticator''' + """Test if all handlers are available on the Authenticator""" auth = NativeAuthenticator(db=app.db) handlers = auth.get_handlers(app) - assert handlers[0][0] == '/login' - assert handlers[1][0] == '/signup' - assert handlers[2][0] == '/discard/([^/]*)' - assert handlers[3][0] == '/authorize' - assert handlers[4][0] == '/authorize/([^/]*)' - assert handlers[5][0] == '/confirm/([^/]*)' - assert handlers[6][0] == '/change-password' - assert handlers[7][0] == '/change-password/([^/]+)' + assert handlers[0][0] == "/login" + assert handlers[1][0] == "/signup" + assert handlers[2][0] == "/discard/([^/]*)" + assert handlers[3][0] == "/authorize" + assert handlers[4][0] == "/authorize/([^/]*)" + assert handlers[5][0] == "/confirm/([^/]*)" + assert handlers[6][0] == "/change-password" + assert handlers[7][0] == "/change-password/([^/]+)" async def test_add_new_attempt_of_login(tmpcwd, app): auth = NativeAuthenticator(db=app.db) assert not auth.login_attempts - auth.add_login_attempt('username') - assert auth.login_attempts['username']['count'] == 1 - auth.add_login_attempt('username') - assert auth.login_attempts['username']['count'] == 2 + auth.add_login_attempt("username") + assert auth.login_attempts["username"]["count"] == 1 + auth.add_login_attempt("username") + assert auth.login_attempts["username"]["count"] == 2 async def test_authentication_login_count(tmpcwd, app): auth = NativeAuthenticator(db=app.db) - infos = {'username': 'johnsnow', 'password': 'password'} - wrong_infos = {'username': 'johnsnow', 'password': 'wrong_password'} - auth.create_user(infos['username'], infos['password']) - UserInfo.change_authorization(app.db, 'johnsnow') + infos = {"username": "johnsnow", "password": "password"} + wrong_infos = {"username": "johnsnow", "password": "wrong_password"} + auth.create_user(infos["username"], infos["password"]) + UserInfo.change_authorization(app.db, "johnsnow") assert not auth.login_attempts await auth.authenticate(app, wrong_infos) - assert auth.login_attempts['johnsnow']['count'] == 1 + assert auth.login_attempts["johnsnow"]["count"] == 1 await auth.authenticate(app, wrong_infos) - assert auth.login_attempts['johnsnow']['count'] == 2 + assert auth.login_attempts["johnsnow"]["count"] == 2 await auth.authenticate(app, infos) - assert not auth.login_attempts.get('johnsnow') + assert not auth.login_attempts.get("johnsnow") async def test_authentication_with_exceed_atempts_of_login(tmpcwd, app): @@ -264,15 +264,15 @@ async def test_authentication_with_exceed_atempts_of_login(tmpcwd, app): auth.allowed_failed_logins = 3 auth.secs_before_next_try = 10 - infos = {'username': 'johnsnow', 'password': 'wrongpassword'} - auth.create_user(infos['username'], 'password') - UserInfo.change_authorization(app.db, 'johnsnow') + infos = {"username": "johnsnow", "password": "wrongpassword"} + auth.create_user(infos["username"], "password") + UserInfo.change_authorization(app.db, "johnsnow") for i in range(3): response = await auth.authenticate(app, infos) assert not response - infos['password'] = 'password' + infos["password"] = "password" response = await auth.authenticate(app, infos) assert not response @@ -283,60 +283,60 @@ async def test_authentication_with_exceed_atempts_of_login(tmpcwd, app): async def test_get_user(tmpcwd, app): auth = NativeAuthenticator(db=app.db) - auth.create_user('johnsnow', 'password') + auth.create_user("johnsnow", "password") # Getting existing user is successful. - assert auth.get_user('johnsnow') is not None + assert auth.get_user("johnsnow") is not None # Getting non-existing user fails. - assert auth.get_user('samwelltarly') is None + assert auth.get_user("samwelltarly") is None async def test_delete_user(tmpcwd, app): auth = NativeAuthenticator(db=app.db) - auth.create_user('johnsnow', 'password') + auth.create_user("johnsnow", "password") - user = type('User', (), {'name': 'johnsnow'}) + user = type("User", (), {"name": "johnsnow"}) auth.delete_user(user) - user_info = UserInfo.find(app.db, 'johnsnow') + user_info = UserInfo.find(app.db, "johnsnow") assert not user_info async def test_import_from_firstuse_dont_delete_db_after(tmpcwd, app): - with dbm.open('passwords.dbm', 'c', 0o600) as db: - db['user1'] = 'password' + with dbm.open("passwords.dbm", "c", 0o600) as db: + db["user1"] = "password" auth = NativeAuthenticator(db=app.db) auth.add_data_from_firstuse() files = os.listdir() - assert UserInfo.find(app.db, 'user1') - assert ('passwords.dbm' in files) or ('passwords.dbm.db' in files) + assert UserInfo.find(app.db, "user1") + assert ("passwords.dbm" in files) or ("passwords.dbm.db" in files) async def test_import_from_firstuse_delete_db_after(tmpcwd, app): - with dbm.open('passwords.dbm', 'c', 0o600) as db: - db['user1'] = 'password' + with dbm.open("passwords.dbm", "c", 0o600) as db: + db["user1"] = "password" auth = NativeAuthenticator(db=app.db) auth.delete_firstuse_db_after_import = True auth.add_data_from_firstuse() files = os.listdir() - assert UserInfo.find(app.db, 'user1') - assert ('passwords.dbm' not in files) and ('passwords.dbm.db' not in files) + assert UserInfo.find(app.db, "user1") + assert ("passwords.dbm" not in files) and ("passwords.dbm.db" not in files) @pytest.mark.parametrize( "user,pwd", [ - ('user1', 'password'), - ('user 1', 'somethingelsereallysecure'), + ("user1", "password"), + ("user 1", "somethingelsereallysecure"), ], ) async def test_import_from_firstuse_invalid_password(user, pwd, tmpcwd, app): - with dbm.open('passwords.dbm', 'c', 0o600) as db: + with dbm.open("passwords.dbm", "c", 0o600) as db: db[user] = pwd auth = NativeAuthenticator(db=app.db) @@ -348,7 +348,7 @@ async def test_import_from_firstuse_invalid_password(user, pwd, tmpcwd, app): async def test_secret_key(app): auth = NativeAuthenticator(db=app.db) auth.ask_email_on_signup = False - auth.allow_self_approval_for = '.*@some-domain.com$' + auth.allow_self_approval_for = ".*@some-domain.com$" auth.secret_key = "short" with pytest.raises(ValueError): @@ -362,7 +362,7 @@ async def test_secret_key(app): async def test_approval_url(app): auth = NativeAuthenticator(db=app.db) - auth.allow_self_approval_for = '.*@some-domain.com$' + auth.allow_self_approval_for = ".*@some-domain.com$" auth.secret_key = "very long and kind-of random asdgaisgfjbafksdgasg" auth.setup_self_approval() diff --git a/nativeauthenticator/tests/test_orm.py b/nativeauthenticator/tests/test_orm.py index d530f60..97c29ce 100644 --- a/nativeauthenticator/tests/test_orm.py +++ b/nativeauthenticator/tests/test_orm.py @@ -17,25 +17,25 @@ def app(): return hub -@pytest.mark.parametrize('email', ['john', 'john@john']) +@pytest.mark.parametrize("email", ["john", "john@john"]) def test_validate_method_wrong_email(email, tmpdir, app): with pytest.raises(AssertionError): - UserInfo(username='john', password=b'pwd', email=email) + UserInfo(username="john", password=b"pwd", email=email) def test_validate_method_correct_email(tmpdir, app): - user = UserInfo(username='john', password=b'pwd', email='john@john.com') + user = UserInfo(username="john", password=b"pwd", email="john@john.com") app.db.add(user) app.db.commit() - assert UserInfo.find(app.db, 'john') + assert UserInfo.find(app.db, "john") def test_all_users(tmpdir, app): assert len(UserInfo.all_users(app.db)) == 1 user = UserInfo( - username='daenerystargaryen', - password=b'yesispeakvalyrian', - email='khaleesi@valyria.com', + username="daenerystargaryen", + password=b"yesispeakvalyrian", + email="khaleesi@valyria.com", ) app.db.add(user) app.db.commit() @@ -45,6 +45,6 @@ def test_all_users(tmpdir, app): def test_wrong_pwd_type(tmpdir, app): with pytest.raises(StatementError): - user = UserInfo(username='john', password='pwd', email='john@john.com') + user = UserInfo(username="john", password="pwd", email="john@john.com") app.db.add(user) - UserInfo.find(app.db, 'john') + UserInfo.find(app.db, "john") diff --git a/setup.py b/setup.py index 6e35989..feb8aa4 100644 --- a/setup.py +++ b/setup.py @@ -5,16 +5,16 @@ long_description = fh.read() setup( - name='jupyterhub-nativeauthenticator', - version='1.0.5', - description='JupyterHub Native Authenticator', + name="jupyterhub-nativeauthenticator", + version="1.0.5", + description="JupyterHub Native Authenticator", long_description=long_description, long_description_content_type="text/markdown", - url='https://github.com/jupyterhub/nativeauthenticator', - author='Leticia Portella', - author_email='leportella@protonmail.com', - license='3 Clause BSD', + url="https://github.com/jupyterhub/nativeauthenticator", + author="Leticia Portella", + author_email="leportella@protonmail.com", + license="3 Clause BSD", packages=find_packages(), - install_requires=['jupyterhub>=1.3', 'bcrypt', 'onetimepass'], + install_requires=["jupyterhub>=1.3", "bcrypt", "onetimepass"], include_package_data=True, )