Skip to content

Commit

Permalink
Divide responsabilities while checking blocked users
Browse files Browse the repository at this point in the history
The previous version to check if a user exceeded a number of failed
logins everything was checked in a single method. This commit splits the
responsabilities between 4 functions with small responsabilites each
  • Loading branch information
leportella committed Jan 23, 2019
1 parent 8c9e061 commit 3197739
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
43 changes: 30 additions & 13 deletions nativeauthenticator/nativeauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class NativeAuthenticator(Authenticator):
help="""Configures the number of failed attempts a user can have
before being blocked."""
)
secs_before_next_try = Integer(
seconds_before_next_try = Integer(
config=True,
default=600,
help="""Configures the number of seconds a user has to wait
Expand All @@ -52,25 +52,39 @@ def add_new_table(self):
if 'users_info' not in inspector.get_table_names():
UserInfo.__table__.create(self.db.bind)

def exceed_atempts_of_login(self, username):
now = datetime.now()
def add_login_attempt(self, username):
if not self.login_attempts.get(username):
self.login_attempts[username] = {'count': 1,
'time': datetime.now()}
else:
self.login_attempts[username]['count'] += 1
self.login_attempts[username]['time'] = datetime.now()

def can_try_to_login_again(self, username):
login_attempts = self.login_attempts.get(username)
if not login_attempts:
self.login_attempts[username] = {'count': 1, 'time': now}
return False
return True

time_last_attempt = now - login_attempts['time']
if time_last_attempt.seconds > self.secs_before_next_try:
self.login_attempts.pop(username)
return False
time_last_attempt = datetime.now() - login_attempts['time']
if time_last_attempt.seconds > self.seconds_before_next_try:
return True

if login_attempts['count'] < self.allowed_failed_logins:
self.login_attempts[username]['count'] += 1
self.login_attempts[username]['time'] = now
return False

def is_blocked(self, username):
logins = self.login_attempts.get(username)

if not logins or logins['count'] < self.allowed_failed_logins:
return False

if self.can_try_to_login_again(username):
return False
return True

def successful_login(self, username):
if self.login_attempts.get(username):
self.login_attempts.pop(username)

@gen.coroutine
def authenticate(self, handler, data):
username = data['username']
Expand All @@ -81,12 +95,15 @@ def authenticate(self, handler, data):
return

if self.allowed_failed_logins:
if self.exceed_atempts_of_login(username):
if self.is_blocked(username):
return

if user.is_authorized and user.is_valid_password(password):
self.successful_login(username)
return username

self.add_login_attempt(username)

def is_password_common(self, password):
common_credentials_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
Expand Down
33 changes: 23 additions & 10 deletions nativeauthenticator/tests/test_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,33 @@ async def test_handlers(app):
assert handlers[2][0] == '/authorize'


async def test_exceed_atempts_of_login(tmpcwd, app):
async def test_add_new_attempt_of_login(tmpcwd, app):
auth = NativeAuthenticator(db=app.db)
auth.allowed_failed_logins = 3
auth.secs_before_next_try = 10

username = 'John Snow'
auth.get_or_create_user(username, 'password')
assert not auth.login_attempts
auth.add_login_attempt('username')
assert auth.login_attempts['username']['count'] == 1
auth.add_login_attempt('username')
assert auth.login_attempts['username']['count'] == 2

for i in range(3):
assert not auth.exceed_atempts_of_login(username)

assert auth.exceed_atempts_of_login(username)
time.sleep(12)
assert not auth.exceed_atempts_of_login(username)
async def test_authentication_login_count(tmpcwd, app):
auth = NativeAuthenticator(db=app.db)
infos = {'username': 'johnsnow', 'password': 'password'}
wrong_infos = {'username': 'johnsnow', 'password': 'wrong_password'}
auth.get_or_create_user(infos['username'], infos['password'])
UserInfo.change_authorization(app.db, 'johnsnow')

assert not auth.login_attempts

await auth.authenticate(app, wrong_infos)
assert auth.login_attempts['johnsnow']['count'] == 1

await auth.authenticate(app, wrong_infos)
assert auth.login_attempts['johnsnow']['count'] == 2

await auth.authenticate(app, infos)
assert not auth.login_attempts.get('johnsnow')


async def test_authentication_with_exceed_atempts_of_login(tmpcwd, app):
Expand Down

0 comments on commit 3197739

Please sign in to comment.