Skip to content

Commit

Permalink
add number verification to network auth
Browse files Browse the repository at this point in the history
  • Loading branch information
maxkahan committed Oct 16, 2024
1 parent 2e925b0 commit 315b6dc
Show file tree
Hide file tree
Showing 15 changed files with 298 additions and 110 deletions.
2 changes: 1 addition & 1 deletion network_auth/src/vonage_network_auth/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.1b0'
__version__ = '1.0.0'
83 changes: 74 additions & 9 deletions network_auth/src/vonage_network_auth/network_auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from urllib.parse import urlencode, urlunparse

from pydantic import validate_call
from vonage_http_client.http_client import HttpClient
from vonage_network_auth.requests import CreateOidcUrl

from .responses import OidcResponse, TokenResponse

Expand All @@ -24,8 +27,54 @@ def http_client(self) -> HttpClient:
return self._http_client

@validate_call
def get_oauth2_user_token(self, number: str, scope: str) -> str:
"""Get an OAuth2 user token for a given number and scope.
def get_oidc_url(self, url_settings: CreateOidcUrl) -> str:
"""Get the URL to use for authentication in a front-end application.
Args:
url_settings (CreateOidcUrl): The settings to use for the URL. Settings include:
- redirect_uri (str): The URI to redirect to after authentication.
- state (str): A unique identifier for the request. Can be any string.
- login_hint (str): The phone number to use for the request.
Returns:
str: The URL to use to make an OIDC request in a front-end application.
"""
base_url = 'https://oidc.idp.vonage.com/oauth2/auth'

params = {
'client_id': self._http_client.auth.application_id,
'redirect_uri': url_settings.redirect_uri,
'response_type': 'code',
'scope': url_settings.scope,
'state': url_settings.state,
'login_hint': self._ensure_plus_prefix(url_settings.login_hint),
}

full_url = urlunparse(('', '', base_url, '', urlencode(params), ''))
return full_url

@validate_call
def get_number_verification_camara_token(self, code: str, redirect_uri: str) -> str:
"""Exchange an OIDC authorization code for a CAMARA access token.
Args:
code (str): The authorization code to use.
redirect_uri (str): The URI to redirect to after authentication.
Returns:
str: The access token to use for further requests.
"""
params = {
'code': code,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
return self._request_access_token(params).access_token

@validate_call
def get_sim_swap_camara_token(self, number: str, scope: str) -> str:
"""Get an OAuth2 user token for a given number and scope, to do a sim swap check.
A CAMARA token is requested using the number and scope, and the token is returned.
Args:
number (str): The phone number to authenticate.
Expand All @@ -34,13 +83,15 @@ def get_oauth2_user_token(self, number: str, scope: str) -> str:
Returns:
str: The OAuth2 user token.
"""
oidc_response = self.make_oidc_request(number, scope)
token_response = self.request_access_token(oidc_response.auth_req_id)
oidc_response = self.make_oidc_auth_id_request(number, scope)
token_response = self.request_sim_swap_access_token(oidc_response.auth_req_id)
return token_response.access_token

@validate_call
def make_oidc_request(self, number: str, scope: str) -> OidcResponse:
"""Make an OIDC request to authenticate a user.
def make_oidc_auth_id_request(self, number: str, scope: str) -> OidcResponse:
"""Make an OIDC request for an authentication ID. The auth ID is then used to
request a JWT. Returns a response containing the authentication request ID that
can be used to generate an authorised JWT. Follows the Camara standard.
Args:
number (str): The phone number to authenticate.
Expand All @@ -62,11 +113,11 @@ def make_oidc_request(self, number: str, scope: str) -> OidcResponse:
return OidcResponse(**response)

@validate_call
def request_access_token(
def request_sim_swap_access_token(
self, auth_req_id: str, grant_type: str = 'urn:openid:params:grant-type:ciba'
) -> TokenResponse:
"""Request a Camara access token using an authentication request ID given as a
response to an OIDC request.
"""Request a Camara access token for a SIM Swap check using an authentication
request ID given as a response to an OIDC request.
Args:
auth_req_id (str): The authentication request ID.
Expand All @@ -77,6 +128,20 @@ def request_access_token(
"""
params = {'auth_req_id': auth_req_id, 'grant_type': grant_type}

return self._request_access_token(params)

@validate_call
def _request_access_token(self, params: dict) -> TokenResponse:
"""Request a Camara access token using an authentication request ID given as a
response to an OIDC request.
Args:
auth_req_id (str): The authentication request ID.
grant_type (str, optional): The grant type.
Returns:
TokenResponse: A response containing the access token.
"""
response = self._http_client.post(
self._host,
'/oauth2/token',
Expand Down
20 changes: 20 additions & 0 deletions network_auth/src/vonage_network_auth/requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Optional

from pydantic import BaseModel


class CreateOidcUrl(BaseModel):
"""Model to craft a URL for OIDC authentication.
Args:
redirect_uri (str): The URI to redirect to after authentication.
state (str): A unique identifier for the request. Can be any string.
login_hint (str): The phone number to use for the request.
"""

redirect_uri: str
state: str
login_hint: str
scope: Optional[
str
] = 'openid dpv:FraudPreventionAndDetection#number-verification-verify-read'
42 changes: 37 additions & 5 deletions network_auth/tests/test_network_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_oidc_request():
'oidc_request.json',
)

response = network_auth.make_oidc_request(
response = network_auth.make_oidc_auth_id_request(
number='447700900000',
scope='dpv:FraudPreventionAndDetection#check-sim-swap',
)
Expand All @@ -40,7 +40,7 @@ def test_oidc_request():


@responses.activate
def test_request_access_token():
def test_sim_swap_token():
build_response(
path,
'POST',
Expand All @@ -54,7 +54,7 @@ def test_request_access_token():
'interval': '2',
}
oidc_response = OidcResponse(**oidc_response_dict)
response = network_auth.request_access_token(oidc_response.auth_req_id)
response = network_auth.request_sim_swap_access_token(oidc_response.auth_req_id)

assert (
response.access_token
Expand All @@ -79,7 +79,7 @@ def test_whole_oauth2_flow():
'token_request.json',
)

access_token = network_auth.get_oauth2_user_token(
access_token = network_auth.get_sim_swap_camara_token(
number='447700900000', scope='dpv:FraudPreventionAndDetection#check-sim-swap'
)
assert (
Expand All @@ -104,8 +104,40 @@ def test_oidc_request_permissions_error():
)

with raises(HttpRequestError) as err:
response = network_auth.make_oidc_request(
network_auth.make_oidc_auth_id_request(
number='447700900000',
scope='dpv:FraudPreventionAndDetection#check-sim-swap',
)
assert err.match('"title": "Bad Request"')


def test_get_oidc_url():
url_options = {
'redirect_uri': 'https://example.com/callback',
'state': 'state_id',
'login_hint': '447700900000',
}
response = network_auth.get_oidc_url(url_options)

assert (
response
== 'https://oidc.idp.vonage.com/oauth2/auth?client_id=test_application_id&redirect_uri=https%3A%2F%2Fexample.com%2Fcallback&response_type=code&scope=openid+dpv%3AFraudPreventionAndDetection%23number-verification-verify-read&state=state_id&login_hint=%2B447700900000'
)


@responses.activate
def test_get_number_verification_camara_token():
build_response(
path,
'POST',
'https://api-eu.vonage.com/oauth2/token',
'token_request.json',
)
token = network_auth.get_number_verification_camara_token(
'code', 'https://example.com/redirect'
)

assert (
token
== 'eyJhbGciOiJSUzI1NiIsImprdSI6Imh0dHBzOi8vYW51YmlzLWNlcnRzLWMxLWV1dzEucHJvZC52MS52b25hZ2VuZXR3b3Jrcy5uZXQvandrcyIsImtpZCI6IkNOPVZvbmFnZSAxdmFwaWd3IEludGVybmFsIENBOjoxOTUxODQ2ODA3NDg1NTYwNjYzODY3MTM0NjE2MjU2MTU5MjU2NDkiLCJ0eXAiOiJKV1QiLCJ4NXUiOiJodHRwczovL2FudWJpcy1jZXJ0cy1jMS1ldXcxLnByb2QudjEudm9uYWdlbmV0d29ya3MubmV0L3YxL2NlcnRzLzA4NjliNDMyZTEzZmIyMzcwZTk2ZGI4YmUxMDc4MjJkIn0.eyJwcmluY2lwYWwiOnsiYXBpS2V5IjoiNGI1MmMwMGUiLCJhcHBsaWNhdGlvbklkIjoiMmJlZTViZWQtNmZlZS00ZjM2LTkxNmQtNWUzYjRjZDI1MjQzIiwibWFzdGVyQWNjb3VudElkIjoiNGI1MmMwMGUiLCJjYXBhYmlsaXRpZXMiOlsibmV0d29yay1hcGktZmVhdHVyZXMiXSwiZXh0cmFDb25maWciOnsiY2FtYXJhU3RhdGUiOiJmb0ZyQndnOFNmeGMydnd2S1o5Y3UrMlgrT0s1K2FvOWhJTTVGUGZMQ1dOeUlMTHR3WmY1dFRKbDdUc1p4QnY4QWx3aHM2bFNWcGVvVkhoWngvM3hUenFRWVkwcHpIZE5XL085ZEdRN1RKOE9sU1lDdTFYYXFEcnNFbEF4WEJVcUpGdnZTTkp5a1A5ZDBYWVN4ajZFd0F6UUFsNGluQjE1c3VMRFNsKy82U1FDa29Udnpld0tvcFRZb0F5MVg2dDJVWXdEVWFDNjZuOS9kVWxIemN3V0NGK3QwOGNReGxZVUxKZyt3T0hwV2xvWGx1MGc3REx0SCtHd0pvRGJoYnMyT2hVY3BobGZqajBpeHQ1OTRsSG5sQ1NYNkZrMmhvWEhKUW01S3JtOVBKSmttK0xTRjVsRTd3NUxtWTRvYTFXSGpkY0dwV1VsQlNQY000YnprOGU0bVE9PSJ9fSwiZmVkZXJhdGVkQXNzZXJ0aW9ucyI6e30sImF1ZCI6ImFwaS1ldS52b25hZ2UuY29tIiwiZXhwIjoxNzE3MDkyODY4LCJqdGkiOiJmNDZhYTViOC1hODA2LTRjMzctODQyMS02OGYwMzJjNDlhMWYiLCJpYXQiOjE3MTcwOTE5NzAsImlzcyI6IlZJQU0tSUFQIiwibmJmIjoxNzE3MDkxOTU1fQ.iLUbyDPR1HGLKh29fy6fqK65Q1O7mjWOletAEPJD4eu7gb0E85EL4M9R7ckJq5lIvgedQt3vBheTaON9_u-VYjMqo8ulPoEoGUDHbOzNbs4MmCW0_CRdDPGyxnUhvcbuJhPgnEHxmfHjJBljncUnk-Z7XCgyNajBNXeQQnHkRF_6NMngxJ-qjjhqbYL0VsF_JS7-TXxixNL0KAFl0SeN2DjkfwRBCclP-69CTExDjyOvouAcchqi-6ZYj_tXPCrTADuzUrQrW8C5nHp2-XjWJSFKzyvi48n8V1U6KseV-eYzBzvy7bJf0tRMX7G6gctTYq3DxdC_eXvXlnp1zx16mg'
)
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
from .number_verification import NetworkNumberVerification
from .errors import NetworkNumberVerificationError
from .number_verification import CreateOidcUrl, NetworkNumberVerification
from .requests import NumberVerificationRequest
from .responses import NumberVerificationResponse

__all__ = ['NetworkNumberVerification']
__all__ = [
'NetworkNumberVerification',
'CreateOidcUrl',
'NumberVerificationRequest',
'NumberVerificationResponse',
'NetworkNumberVerificationError',
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from vonage_utils import VonageError


class NetworkNumberVerificationError(VonageError):
"""Base class for Vonage Network Number Verification errors."""
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from urllib.parse import urlencode, urlunparse

from pydantic import validate_call
from vonage_http_client import HttpClient
from vonage_network_auth import NetworkAuth
from vonage_network_number_verification.requests import CreateOidcUrl
from vonage_network_auth.requests import CreateOidcUrl
from vonage_network_number_verification.requests import NumberVerificationRequest
from vonage_network_number_verification.responses import NumberVerificationResponse


Expand Down Expand Up @@ -40,40 +39,24 @@ def get_oidc_url(self, url_settings: CreateOidcUrl) -> str:
Returns:
str: The URL to use to make an OIDC request in a front-end application.
"""
base_url = 'https://oidc.idp.vonage.com/oauth2/auth'

params = {
'client_id': self._http_client.auth.application_id,
'redirect_uri': url_settings.redirect_uri,
'response_type': 'code',
'scope': url_settings.scope,
}
if url_settings.state is not None:
params['state'] = url_settings.state
if url_settings.login_hint is not None:
if url_settings.login_hint.startswith('+'):
params['login_hint'] = url_settings.login_hint
else:
params['login_hint'] = f'+{url_settings.login_hint}'

full_url = urlunparse(('', '', base_url, '', urlencode(params), ''))
return full_url
return self._network_auth.get_oidc_url(url_settings)

@validate_call
def get_oidc_token(
self, oidc_response: dict, grant_type: str = 'urn:openid:params:grant-type:ciba'
):
"""Request a Camara token using an authentication request ID given as a response
to the OIDC request."""
params = {
'grant_type': grant_type,
'auth_req_id': oidc_response['auth_req_id'],
}
return self._request_camara_token(params)
def exchange_code_for_token(self, code: str, redirect_uri: str) -> str:
"""Exchange an OIDC authorization code for a CAMARA access token.
Args:
code (str): The authorization code to use.
redirect_uri (str): The URI to redirect to after authentication.
Returns:
str: The access token to use for further requests.
"""
return self._network_auth.get_number_verification_camara_token(code, redirect_uri)

@validate_call
def verify(
self, access_token: str, phone_number: str = None, hashed_phone_number: str = None
self, number_verification_params: NumberVerificationRequest
) -> NumberVerificationResponse:
"""Verify if the specified phone number matches the one that the user is currently
using.
Expand All @@ -91,13 +74,18 @@ def verify(
NumberVerificationResponse: Class containing the Number Verification response
containing the device verification information.
"""
return self._http_client.post(
params = {}
if number_verification_params.phone_number is not None:
params = {'phoneNumber': number_verification_params.phone_number}
else:
params = {'hashedPhoneNumber': number_verification_params.hashed_phone_number}

response = self._http_client.post(
self._host,
'/camara/number-verification/v031/verify',
params={
'phoneNumber': phone_number,
'hashedPhoneNumber': hashed_phone_number,
},
params=params,
auth_type=self._auth_type,
token=access_token,
token=number_verification_params.access_token,
)

return NumberVerificationResponse(**response)
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
from typing import Optional

from pydantic import BaseModel, Field


class CreateOidcUrl(BaseModel):
"""Model to craft a URL for OIDC authentication.
Args:
redirect_uri (str): The URI to redirect to after authentication.
state (str, optional): A unique identifier for the request. Can be any string.
login_hint (str, optional): The phone number to use for the request.
"""

redirect_uri: str
state: Optional[str] = None
login_hint: Optional[str] = None
scope: Optional[
str
] = 'openid dpv:FraudPreventionAndDetection#number-verification-verify-read'
from pydantic import BaseModel, Field, model_validator
from vonage_network_number_verification.errors import NetworkNumberVerificationError


class NumberVerificationRequest(BaseModel):
"""Model for the request to verify a phone number.
Args:
access_token (str): The access token for the request obtained from the
three-legged OAuth2 flow.
phone_number (str): The phone number to verify. Use the E.164 format with
or without a leading +.
hashed_phone_number (str): The hashed phone number to verify.
"""

phone_number: str = Field(..., alias='phoneNumber')
hashed_phone_number: str = Field(..., alias='hashedPhoneNumber')
access_token: str
phone_number: str = Field(None, serialization_alias='phoneNumber')
hashed_phone_number: str = Field(None, serialization_alias='hashedPhoneNumber')

@model_validator(mode='after')
def check_only_one_phone_number(self):
"""Check that only one of `phone_number` and `hashed_phone_number` is set."""

if self.phone_number is not None and self.hashed_phone_number is not None:
raise NetworkNumberVerificationError(
'Only one of `phone_number` and `hashed_phone_number` can be set.'
)

if self.phone_number is None and self.hashed_phone_number is None:
raise NetworkNumberVerificationError(
'One of `phone_number` and `hashed_phone_number` must be set.'
)

return self
Loading

0 comments on commit 315b6dc

Please sign in to comment.