Skip to content

Commit

Permalink
add cognito user lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
torbjokv committed Sep 20, 2024
1 parent 926ddca commit 2178afa
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ python3 roles.py help
It will return a list of items where at least one attribute contains the given value.
The items are returned as a list of dictionaries with 'PrimaryKeyHashKey', 'givenName',
and 'familyName' as keys.
- 'clookup' to perform a lookup directly in cognito user pool.
Use it as: python3 roles.py clookup [value]
This will return a list of users where at least one attribute contains the given value.
The users are returned as a serialized JSON string.
- 'help' to see this message again.
```

Expand Down
72 changes: 71 additions & 1 deletion roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ def get_table_name():

raise ValueError('No valid table found.')

def get_user_pool_id():
client = boto3.client('ssm')
parameter_name = 'CognitoUserPoolId'

response = client.get_parameter(
Name=parameter_name,
WithDecryption=True
)

return response['Parameter']['Value']

def get_user_roles(key):
table_name = get_table_name()

Expand All @@ -40,6 +51,41 @@ def get_user_roles(key):

return roles

def get_all_users(user_pool_id):
cognito = boto3.client('cognito-idp')
pagination_token = None
users = []

while True:
if pagination_token:
response = cognito.list_users(UserPoolId=user_pool_id, PaginationToken=pagination_token)
else:
response = cognito.list_users(UserPoolId=user_pool_id)

users.extend(response['Users'])

pagination_token = response.get('PaginationToken')
if not pagination_token:
break

return users

def lookup_user_by_custom_attr(nvaUsername, users):
for user in users:
for attribute in user['Attributes']:
if attribute['Name'] == 'custom:nvaUsername' and attribute['Value'] == nvaUsername:
return user['Username']

return None

def lookup_users_by_attribute_value(attribute_value, users):
matches = []
for user in users:
for attribute in user['Attributes']:
if attribute['Value'] == attribute_value:
matches.append(user)
return matches if matches else None

def load_roles_from_file(filename):
with open(filename, 'r') as f:
roles = json.load(f)
Expand All @@ -50,11 +96,15 @@ def load_roles_from_file(filename):
return roles_python

def get_key_name_fields(items):
user_pool_id = get_user_pool_id()
users = get_all_users(user_pool_id)
key_name_fields = [
{
'PrimaryKeyHashKey': item.get('PrimaryKeyHashKey'),
'PrimaryKeyRangeKey': item.get('PrimaryKeyRangeKey'),
'givenName': item.get('givenName'),
'familyName': item.get('familyName')
'familyName': item.get('familyName'),
'cognitoUsername': lookup_user_by_custom_attr(item.get('username'), users)
}
for item in items
]
Expand Down Expand Up @@ -128,6 +178,12 @@ def help():
The items are returned as a list of dictionaries with 'PrimaryKeyHashKey', 'givenName',
and 'familyName' as keys.
- 'clookup' to perform a lookup directly in cognito user pool.
Use it as: python3 roles.py clookup [value]
This will return a list of users where at least one attribute contains the given value.
The users are returned as a serialized JSON string.
- 'help' to see this message again.
"""
print(instructions)
Expand Down Expand Up @@ -174,6 +230,20 @@ def help():


print(json.dumps(names, indent=2, sort_keys=True, default=str, ensure_ascii=False))

elif action == 'clookup':
if len(sys.argv) < 3:
print("Please provide the value to lookup(2)")
sys.exit(1)

value = sys.argv[2]

user_pool_id = get_user_pool_id()
users = get_all_users(user_pool_id)
result = lookup_users_by_attribute_value(value, users)


print(json.dumps(result, indent=2, sort_keys=True, default=str, ensure_ascii=False))

elif action == 'help':
help()
Expand Down

0 comments on commit 2178afa

Please sign in to comment.