From 49d51cdeb93635e6724e2646477003d20a1cac4a Mon Sep 17 00:00:00 2001 From: Sumit Singh Date: Sat, 4 Dec 2021 15:29:49 +0530 Subject: [PATCH] Add tests and type annotate utils --- drf_user/utils.py | 108 ++++++++++++++++++++++---------------------- tests/test_utils.py | 44 ++++++++++++++++++ tests/test_views.py | 4 +- 3 files changed, 100 insertions(+), 56 deletions(-) diff --git a/drf_user/utils.py b/drf_user/utils.py index 7d0526e..52ec262 100644 --- a/drf_user/utils.py +++ b/drf_user/utils.py @@ -1,5 +1,9 @@ """Collection of general helper functions.""" +from __future__ import annotations + import datetime +from typing import Dict +from typing import Optional import pytz from django.http import HttpRequest @@ -18,11 +22,11 @@ from drf_user.models import OTPValidation from drf_user.models import User -user_settings = update_user_settings() -otp_settings = user_settings["OTP"] +user_settings: Dict[str, bool | Dict] = update_user_settings() +otp_settings: Dict[str, str | int] = user_settings["OTP"] -def get_client_ip(request: HttpRequest): +def get_client_ip(request: HttpRequest) -> Optional[str]: """ Fetches the IP address of a client from Request and return in proper format. @@ -34,7 +38,7 @@ def get_client_ip(request: HttpRequest): Returns ------- - ip: str + ip: str | None """ x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if x_forwarded_for: @@ -43,7 +47,7 @@ def get_client_ip(request: HttpRequest): return request.META.get("REMOTE_ADDR") -def datetime_passed_now(source): +def datetime_passed_now(source: datetime.datetime) -> bool: """ Compares provided datetime with current time on the basis of Django settings. Checks source is in future or in past. False if it's in future. @@ -63,7 +67,7 @@ def datetime_passed_now(source): return source <= datetime.datetime.now() -def check_unique(prop, value): +def check_unique(prop: str, value: str) -> bool: """ This function checks if the value provided is present in Database or can be created in DBMS as unique data. @@ -92,7 +96,7 @@ def check_unique(prop, value): return user.count() == 0 -def generate_otp(prop, value): +def generate_otp(prop: str, value: str) -> OTPValidation: """ This function generates an OTP and saves it into Model. It also sets various counters, such as send_counter, @@ -120,7 +124,7 @@ def generate_otp(prop, value): 5039164 """ # Create a random number - random_number = User.objects.make_random_password( + random_number: str = User.objects.make_random_password( length=otp_settings["LENGTH"], allowed_chars=otp_settings["ALLOWED_CHARS"] ) @@ -129,16 +133,16 @@ def generate_otp(prop, value): while OTPValidation.objects.filter(otp__exact=random_number).filter( is_validated=False ): - random_number = User.objects.make_random_password( + random_number: str = User.objects.make_random_password( length=otp_settings["LENGTH"], allowed_chars=otp_settings["ALLOWED_CHARS"] ) # Get or Create new instance of Model with value of provided value # and set proper counter. try: - otp_object = OTPValidation.objects.get(destination=value) + otp_object: OTPValidation = OTPValidation.objects.get(destination=value) except OTPValidation.DoesNotExist: - otp_object = OTPValidation() + otp_object: OTPValidation = OTPValidation() otp_object.destination = value else: if not datetime_passed_now(otp_object.reactive_at): @@ -159,7 +163,7 @@ def generate_otp(prop, value): return otp_object -def send_otp(value, otpobj, recip): +def send_otp(value: str, otpobj: OTPValidation, recip: str) -> Dict: """ This function sends OTP to specified value. Parameters @@ -176,27 +180,22 @@ def send_otp(value, otpobj, recip): ------- """ - otp = otpobj.otp + otp: str = otpobj.otp if not datetime_passed_now(otpobj.reactive_at): raise PermissionDenied( - detail=_("OTP sending not allowed until: " + str(otpobj.reactive_at)) + detail=_(f"OTP sending not allowed until: {otpobj.reactive_at}") ) message = ( - "OTP for verifying " - + otpobj.get_prop_display() - + ": " - + value - + " is " - + otp - + ". Don't share this with anyone!" + f"OTP for verifying {otpobj.get_prop_display()}: {value} is {otp}." + f" Don't share this with anyone!" ) try: - rdata = send_message(message, otp_settings["SUBJECT"], [value], [recip]) + rdata: dict = send_message(message, otp_settings["SUBJECT"], [value], [recip]) except ValueError as err: - raise APIException(_("Server configuration error occured: %s") % str(err)) + raise APIException(_(f"Server configuration error occurred: {err}")) otpobj.reactive_at = timezone.now() + datetime.timedelta( minutes=otp_settings["COOLING_PERIOD"] @@ -206,7 +205,7 @@ def send_otp(value, otpobj, recip): return rdata -def login_user(user: User, request: HttpRequest) -> dict: +def login_user(user: User, request: HttpRequest) -> Dict[str, str]: """ This function is used to login a user. It saves the authentication in AuthTransaction model. @@ -221,7 +220,7 @@ def login_user(user: User, request: HttpRequest) -> dict: dict: Generated JWT tokens for user. """ - token = RefreshToken.for_user(user) + token: RefreshToken = RefreshToken.for_user(user) # Add custom claims if hasattr(user, "email"): @@ -252,7 +251,7 @@ def login_user(user: User, request: HttpRequest) -> dict: } -def check_validation(value): +def check_validation(value: str) -> bool: """ This functions check if given value is already validated via OTP or not. Parameters @@ -272,13 +271,13 @@ def check_validation(value): """ try: - otp_object = OTPValidation.objects.get(destination=value) + otp_object: OTPValidation = OTPValidation.objects.get(destination=value) return otp_object.is_validated except OTPValidation.DoesNotExist: return False -def validate_otp(value, otp): +def validate_otp(value: str, otp: int) -> bool: """ This function is used to validate the OTP for a particular value. It also reduces the attempt count by 1 and resets OTP. @@ -295,32 +294,9 @@ def validate_otp(value, otp): """ try: # Try to get OTP Object from Model and initialize data dictionary - otp_object = OTPValidation.objects.get(destination=value, is_validated=False) - - # Decrement validate_attempt - otp_object.validate_attempt -= 1 - - if str(otp_object.otp) == str(otp): - otp_object.is_validated = True - otp_object.save() - return True - - elif otp_object.validate_attempt <= 0: - generate_otp(otp_object.prop, value) - raise AuthenticationFailed( - detail=_("Incorrect OTP. Attempt exceeded! OTP has been " "reset.") - ) - - else: - otp_object.save() - raise AuthenticationFailed( - detail=_( - "OTP Validation failed! " - + str(otp_object.validate_attempt) - + " attempts left!" - ) - ) - + otp_object: OTPValidation = OTPValidation.objects.get( + destination=value, is_validated=False + ) except OTPValidation.DoesNotExist: raise NotFound( detail=_( @@ -328,3 +304,27 @@ def validate_otp(value, otp): "destination. Kindly send an OTP first" ) ) + # Decrement validate_attempt + otp_object.validate_attempt -= 1 + + if str(otp_object.otp) == str(otp): + # match otp + otp_object.is_validated = True + otp_object.save() + return True + + elif otp_object.validate_attempt <= 0: + # check if attempts exceeded and regenerate otp and raise error + generate_otp(otp_object.prop, value) + raise AuthenticationFailed( + detail=_("Incorrect OTP. Attempt exceeded! OTP has been reset.") + ) + + else: + # update attempts and raise error + otp_object.save() + raise AuthenticationFailed( + detail=_( + f"OTP Validation failed! {otp_object.validate_attempt} attempts left!" + ) + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 3ebdacd..fe18c47 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,6 +2,7 @@ import datetime import pytest +from django.http import HttpRequest from django.test import TestCase from django.utils import timezone from model_bakery import baker @@ -10,6 +11,7 @@ from drf_user import utils as utils from drf_user.models import OTPValidation from drf_user.models import User +from drf_user.utils import get_client_ip class TestCheckUnique(TestCase): @@ -145,3 +147,45 @@ def test_validate_otp_raises_invalid_otp_exception(self): "OTP Validation failed! 2 attempts left!", str(context_manager.exception.detail), ) + + +class TestGetClientIP(TestCase): + """get_client_ip test""" + + def test_meta_none(self): + """Check get_client_ip returns None when META is empty""" + request = HttpRequest() + request.META = {} + ip = get_client_ip(request) + self.assertIsNone(ip) + + def test_meta_single_with_http_x_forwarded_for(self): + """Check get_client_ip returns first item from HTTP_X_FORWARDED_FOR""" + request = HttpRequest() + request.META = { + "HTTP_X_FORWARDED_FOR": "177.139.233.139, 198.84.193.157, 198.84.193.158", + } + result = get_client_ip(request) + self.assertEqual(result, "177.139.233.139") + + def test_meta_single_with_remote_addr(self): + """Check get_client_ip returns first item from HTTP_X_FORWARDED_FOR""" + request = HttpRequest() + request.META = { + "REMOTE_ADDR": "198.84.193.158", + } + result = get_client_ip(request) + self.assertEqual(result, "198.84.193.158") + + def test_meta_multi(self): + """ + Check get_client_ip returns ip from HTTP_X_FORWARDED_FOR when + HTTP_X_FORWARDED_FOR and REMOTE_ADDR is present + """ + request = HttpRequest() + request.META = { + "HTTP_X_FORWARDED_FOR": "177.139.233.139, 198.84.193.157, 198.84.193.158", + "REMOTE_ADDR": "177.139.233.133", + } + result = get_client_ip(request) + self.assertEqual(result, "177.139.233.139") diff --git a/tests/test_views.py b/tests/test_views.py index de7de33..02952fb 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -336,7 +336,7 @@ def test_raise_api_exception_when_email_invalid(self): self.assertEqual(500, response.status_code) self.assertEqual( - "Server configuration error occured: Invalid recipient.", + "Server configuration error occurred: Invalid recipient.", response.json()["detail"], ) @@ -584,7 +584,7 @@ def test_login_with_incorrect_email_mobile(self): # when drf_addons is updated self.assertEqual(500, response.status_code) self.assertEqual( - "Server configuration error occured: Invalid recipient.", + "Server configuration error occurred: Invalid recipient.", response.json()["detail"], )