Skip to content

Commit

Permalink
Added Device Authentication Support (#216)
Browse files Browse the repository at this point in the history
* Added Device Authentication Support

* Update README.md

* Update README.md

* Update aws_srp.py

* Update README.md

* Update README.md

* file rename

* Update README.md
  • Loading branch information
arshvimal authored Jan 22, 2024
1 parent 59b1229 commit 2590f2f
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 1 deletion.
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ Makes working with AWS Cognito easier for Python developers.
- [Respond to SMS MFA challenge](#respond-to-sms-mfa-challenge)
- [Cognito SRP Utility](#cognito-srp-utility)
- [Using AWSSRP](#using-awssrp)
- [Device Authentication Support](#device-authentication-support)
- [Receiving DeviceKey and DeviceGroupKey](#receiving-devicekey-and-devicegroupkey)
- [Confirming a Device](#confirming-a-device)
- [Updating Device Status](#updating-device-status)
- [Authenticating your Device](#authenticating-your-device)
- [Forget Device](#forget-device)
- [SRP Requests Authenticator](#srp-requests-authenticator)

## Python Versions Supported
Expand Down Expand Up @@ -672,6 +678,69 @@ aws = AWSSRP(username='username', password='password', pool_id='user_pool_id',
tokens = aws.authenticate_user()
```

## Device Authentication Support

You must use the `USER_SRP_AUTH` authentication flow to use the device tracking feature. Read more about [Remembered Devices](https://repost.aws/knowledge-center/cognito-user-pool-remembered-devices)

### Receiving DeviceKey and DeviceGroupKey

Once the `authenticate_user` class method is used for SRP authentication, the response also returns `DeviceKey` and `DeviceGrouKey`.
These Keys will later be used to confirm the device.

```python
import boto3
from pycognito.aws_srp import AWSSRP

client = boto3.client('cognito-idp')
aws = AWSSRP(username='username', password='password', pool_id='user_pool_id',
client_id='client_id', client=client)
tokens = aws.authenticate_user()
device_key = tokens["AuthenticationResult"]["NewDeviceMetadata"]["DeviceKey"]
device_group_key = tokens["AuthenticationResult"]["NewDeviceMetadata"]["DeviceGroupKey"]
```
### Confirming a Device

The `confirm_device` class method is used for confirming a device, it takes two inputs, `tokens` and `DeviceName` (`DeviceName` is optional).
The method returns two values, `response` and `device_password`. `device_password` will later be used to authenticate your device with
the Cognito user pool.

```python
response, device_password = user.confirm_device(tokens=tokens)
```

### Updating Device Status

The `update_device_status` class method is used to update whether or not your device should be remembered. This method takes
three inputs, `isRemembered`, `access_token` and `device_key`. `isRemembered` is a boolean value, which sets the device status as
`"remembered"` on `True` and `"not_remembered"` on `False`, `access_token` is the Access Token provided by Cognito and `device_key` is the key
provided by the `authenticate_user` method.

```python
response = user.update_device_status(False, tokens["AuthenticationResult"]["AccessToken"], device_key)
```

### Authenticating your Device

To authenticate your Device, you can just add `device_key`, `device_group_key` and `device_password` to the AWSSRP class.

```python
import boto3
from pycognito.aws_srp import AWSSRP

client = boto3.client('cognito-idp')
aws = AWSSRP(username='username', password='password', pool_id='user_pool_id',
client_id='client_id', client=client, device_key="device_key",
device_group_key="device_group_key", device_password="device_password")
tokens = aws.authenticate_user()
```
### Forget Device

To forget device, you can call the `forget_device` class method. It takes `access_token` and `device_key` as input.

```python
resonse = aws.forget_device(access_token='access_token', device_key='device_key')
```

## SRP Requests Authenticator

`pycognito.utils.RequestsSrpAuth` is a [Requests](https://docs.python-requests.org/en/latest/)
Expand Down
167 changes: 166 additions & 1 deletion pycognito/aws_srp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import hashlib
import hmac
import os
import re
import platform
import requests
import json

import boto3

Expand Down Expand Up @@ -116,13 +120,36 @@ def calculate_u(big_a, big_b):
u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b))
return hex_to_long(u_hex_hash)

def generate_hash_device(device_group_key, device_key):
# source: https://github.com/amazon-archives/amazon-cognito-identity-js/blob/6b87f1a30a998072b4d98facb49dcaf8780d15b0/src/AuthenticationHelper.js#L137

# random device password, which will be used for DEVICE_SRP_AUTH flow
device_password = base64.standard_b64encode(os.urandom(40)).decode('utf-8')

combined_string = '%s%s:%s' % (device_group_key, device_key, device_password)
combined_string_hash = hash_sha256(combined_string.encode('utf-8'))
salt = pad_hex(get_random(16))

x_value = hex_to_long(hex_hash(salt + combined_string_hash))
g = hex_to_long(G_HEX)
big_n = hex_to_long(N_HEX)
verifier_device_not_padded = pow(g, x_value, big_n)
verifier = pad_hex(verifier_device_not_padded)

device_secret_verifier_config = {
"PasswordVerifier": base64.standard_b64encode(bytearray.fromhex(verifier)).decode('utf-8'),
"Salt": base64.standard_b64encode(bytearray.fromhex(salt)).decode('utf-8')
}
return device_password, device_secret_verifier_config

class AWSSRP:

SMS_MFA_CHALLENGE = "SMS_MFA"
SOFTWARE_TOKEN_MFA_CHALLENGE = "SOFTWARE_TOKEN_MFA"
NEW_PASSWORD_REQUIRED_CHALLENGE = "NEW_PASSWORD_REQUIRED"
PASSWORD_VERIFIER_CHALLENGE = "PASSWORD_VERIFIER"
DEVICE_SRP_CHALLENGE = "DEVICE_SRP_AUTH"
DEVICE_PASSWORD_VERIFIER_CHALLENGE = "DEVICE_PASSWORD_VERIFIER"

def __init__(
self,
Expand All @@ -133,12 +160,20 @@ def __init__(
pool_region=None,
client=None,
client_secret=None,
):
device_key=None,
device_group_key=None,
device_password=None,
):
if pool_region is not None and client is not None:
raise ValueError(
"pool_region and client should not both be specified "
"(region should be passed to the boto3 client instead)"
)
if device_key is not None or device_group_key is not None or device_password is not None:
if device_key is None or device_group_key is None or device_password is None:
raise ValueError(
"Either all device_key, device_group_key, and device_password should be specified or none at all "
)

self.username = username
self.password = password
Expand All @@ -148,6 +183,9 @@ def __init__(
self.client = (
client if client else boto3.client("cognito-idp", region_name=pool_region)
)
self.device_key = device_key
self.device_group_key = device_group_key
self.device_password = device_password
self.big_n = hex_to_long(N_HEX)
self.val_g = hex_to_long(G_HEX)
self.val_k = hex_to_long(hex_hash("00" + N_HEX + "0" + G_HEX))
Expand Down Expand Up @@ -199,6 +237,20 @@ def get_password_authentication_key(self, username, password, server_b_value, sa
bytearray.fromhex(pad_hex(long_to_hex(u_value))),
)
return hkdf

def get_device_authentication_key(self, device_group_key, device_key, device_password, server_b_value, salt):
u_value = calculate_u(self.large_a_value, server_b_value)
if u_value == 0:
raise ValueError('U cannot be zero.')
username_password = '%s%s:%s' % (device_group_key, device_key, device_password)
username_password_hash = hash_sha256(username_password.encode('utf-8'))

x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash))
g_mod_pow_xn = pow(self.val_g, x_value, self.big_n)
int_value2 = server_b_value - self.val_k * g_mod_pow_xn
s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n)
hkdf = compute_hkdf(bytearray.fromhex(pad_hex(s_value)), bytearray.fromhex(pad_hex(long_to_hex(u_value))))
return hkdf

def get_auth_params(self):
auth_params = {
Expand All @@ -213,6 +265,12 @@ def get_auth_params(self):
)
}
)
if self.device_key is not None:
auth_params.update(
{
"DEVICE_KEY": self.device_key
}
)
return auth_params

@staticmethod
Expand Down Expand Up @@ -261,6 +319,35 @@ def process_challenge(self, challenge_parameters, request_parameters):
)
}
)
if self.device_key is not None:
response.update(
{
"DEVICE_KEY": self.device_key
}
)
return response

def process_device_challenge(self, challenge_parameters):
username = challenge_parameters['USERNAME']
salt_hex = challenge_parameters['SALT']
srp_b_hex = challenge_parameters['SRP_B']
secret_block_b64 = challenge_parameters['SECRET_BLOCK']
# re strips leading zero from a day number (required by AWS Cognito)
timestamp = re.sub(r" 0(\d) ", r" \1 ", datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"))
hkdf = self.get_device_authentication_key(self.device_group_key, self.device_key, self.device_password, hex_to_long(srp_b_hex), salt_hex)
secret_block_bytes = base64.standard_b64decode(secret_block_b64)
msg = bytearray(self.device_group_key, 'utf-8') + bytearray(self.device_key, 'utf-8') + bytearray(secret_block_bytes) + bytearray(timestamp, 'utf-8')
hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256)
signature_string = base64.standard_b64encode(hmac_obj.digest())
response = {'TIMESTAMP': timestamp,
'USERNAME': username,
'PASSWORD_CLAIM_SECRET_BLOCK': secret_block_b64,
'PASSWORD_CLAIM_SIGNATURE': signature_string.decode('utf-8'),
'DEVICE_KEY': self.device_key}
if self.client_secret is not None:
response.update({
"SECRET_HASH":
self.get_secret_hash(username, self.client_id, self.client_secret)})
return response

def authenticate_user(self, client=None, client_metadata=None):
Expand All @@ -281,6 +368,24 @@ def authenticate_user(self, client=None, client_metadata=None):
ChallengeResponses=challenge_response,
**dict(ClientMetadata=client_metadata) if client_metadata else {},
)
if tokens.get("ChallengeName") == self.DEVICE_SRP_CHALLENGE:
challenge_response = {
"USERNAME": self.username,
"DEVICE_KEY": self.device_key,
"SRP_A": long_to_hex(self.large_a_value),
}
response = boto_client.respond_to_auth_challenge(
ClientId=self.client_id,
ChallengeName="DEVICE_SRP_AUTH",
ChallengeResponses=challenge_response
)
challenge_response = self.process_device_challenge(response["ChallengeParameters"])
tokens = boto_client.respond_to_auth_challenge(
ClientId=self.client_id,
ChallengeName="DEVICE_PASSWORD_VERIFIER",
ChallengeResponses=challenge_response
)
return tokens

if tokens.get("ChallengeName") == self.NEW_PASSWORD_REQUIRED_CHALLENGE:
raise ForceChangePasswordException(
Expand Down Expand Up @@ -339,3 +444,63 @@ def set_new_password_challenge(self, new_password, client=None):
raise NotImplementedError(
f"The {response['ChallengeName']} challenge is not supported"
)

def confirm_device(self, tokens, device_name=None):
self.access_token = tokens["AuthenticationResult"]["AccessToken"]
self.device_key = tokens["AuthenticationResult"]["NewDeviceMetadata"]["DeviceKey"]
self.device_group_key = tokens["AuthenticationResult"]["NewDeviceMetadata"]["DeviceGroupKey"]
self.device_name = device_name
self.cognito_idp_url = f"https://cognito-idp.{self.pool_id.split('_')[0]}.amazonaws.com/"
device_password, device_secret_verifier_config = generate_hash_device(self.device_group_key, self.device_key)
if device_name is None:
device_name = platform.node()
headers = {
'Authorization': f"Bearer {self.access_token}",
'Content-Type': 'application/x-amz-json-1.1',
'X-Amz-Target': 'AWSCognitoIdentityProviderService.ConfirmDevice',
}
data = {
"AccessToken": self.access_token,
"DeviceKey": self.device_key,
"DeviceName": device_name,
"DeviceSecretVerifierConfig": device_secret_verifier_config
}
response = requests.post(self.cognito_idp_url, headers=headers, data=json.dumps(data))
return response, device_password

def update_device_status(self, isRemembered, access_token, device_key):
self.cognito_idp_url = f"https://cognito-idp.{self.pool_id.split('_')[0]}.amazonaws.com/"
self.access_token = access_token
self.device_key = device_key
headers = {
'Authorization': f"Bearer {self.access_token}",
'Content-Type': 'application/x-amz-json-1.1',
'X-Amz-Target': 'AWSCognitoIdentityProviderService.UpdateDeviceStatus',
}
if isRemembered == True:
status = "remembered"
elif isRemembered == False:
status = "not_remembered"
data = {
"AccessToken": self.access_token,
"DeviceKey": self.device_key,
"DeviceRememberedStatus": status
}
response = requests.post(self.cognito_idp_url, headers=headers, data=json.dumps(data))
return f"{response} : {response.json}"

def forget_device(self, access_token, device_key):
self.cognito_idp_url = f"https://cognito-idp.{self.pool_id.split('_')[0]}.amazonaws.com/"
self.access_token = access_token
self.device_key = device_key
headers = {
'Authorization': f"Bearer {self.access_token}",
'Content-Type': 'application/x-amz-json-1.1',
'X-Amz-Target': 'AWSCognitoIdentityProviderService.ForgetDevice',
}
data = {
"AccessToken": self.access_token,
"DeviceKey": self.device_key
}
response = requests.post(self.cognito_idp_url, headers=headers, data=json.dumps(data))
return f"{response} : {response.json}"

0 comments on commit 2590f2f

Please sign in to comment.