Userpool UsernameAttributes (#4262)

This commit is contained in:
Leo Thomas 2021-10-20 07:56:45 -04:00 committed by GitHub
parent 766f9ffc0d
commit 46131e0340
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 573 additions and 45 deletions

View File

@ -18,7 +18,14 @@ from .exceptions import (
UserNotConfirmedException,
InvalidParameterException,
)
from .utils import create_id, check_secret_hash, PAGINATION_MODEL
from .utils import (
create_id,
check_secret_hash,
validate_username_format,
flatten_attrs,
expand_attrs,
PAGINATION_MODEL,
)
from moto.utilities.paginator import paginate
UserStatus = {
@ -82,6 +89,20 @@ class CognitoIdpUserPool(BaseModel):
return user_pool_json
def _get_user(self, username):
"""Find a user within a user pool by Username or any UsernameAttributes
(`email` or `phone_number` or both)"""
if self.extended_config.get("UsernameAttributes"):
attribute_types = self.extended_config["UsernameAttributes"]
for user in self.users.values():
if username in [
flatten_attrs(user.attributes).get(attribute_type)
for attribute_type in attribute_types
]:
return user
return self.users.get(username)
def create_jwt(
self, client_id, username, token_use, expires_in=60 * 60, extra_data={}
):
@ -90,12 +111,12 @@ class CognitoIdpUserPool(BaseModel):
"iss": "https://cognito-idp.{}.amazonaws.com/{}".format(
self.region, self.id
),
"sub": self.users[username].id,
"sub": self._get_user(username).id,
"aud": client_id,
"token_use": token_use,
"auth_time": now,
"exp": now + expires_in,
"email": self.users[username].username,
"email": flatten_attrs(self._get_user(username).attributes).get("email"),
}
payload.update(extra_data)
headers = {"kid": "dummy"} # KID as present in jwks-public.json
@ -140,7 +161,7 @@ class CognitoIdpUserPool(BaseModel):
attribute = list(
filter(
lambda f: f["Name"] == readable_field,
self.users.get(username).attributes,
self._get_user(username).attributes,
)
)
if len(attribute) > 0:
@ -261,12 +282,14 @@ class CognitoIdpUser(BaseModel):
def __init__(self, user_pool_id, username, password, status, attributes):
self.id = str(uuid.uuid4())
self.user_pool_id = user_pool_id
self.username = username
# Username is None when users sign up with an email or phone_number,
# and should be given the value of the internal id generate (sub)
self.username = username if username else self.id
self.password = password
self.status = status
self.enabled = True
self.attributes = attributes
self.attribute_lookup = self._flatten_attributes(attributes)
self.attribute_lookup = flatten_attrs(attributes)
self.create_date = datetime.datetime.utcnow()
self.last_modified_date = datetime.datetime.utcnow()
self.sms_mfa_enabled = False
@ -277,6 +300,8 @@ class CognitoIdpUser(BaseModel):
# Note that these links are bidirectional.
self.groups = set()
self.update_attributes([{"Name": "sub", "Value": self.id}])
def _base_json(self):
return {
"UserPoolId": self.user_pool_id,
@ -306,15 +331,9 @@ class CognitoIdpUser(BaseModel):
return user_json
def _flatten_attributes(self, attributes):
return {attr["Name"]: attr["Value"] for attr in attributes}
def update_attributes(self, new_attributes):
def expand_attrs(attrs):
return [{"Name": k, "Value": v} for k, v in attrs.items()]
flat_attributes = self._flatten_attributes(self.attributes)
flat_attributes.update(self._flatten_attributes(new_attributes))
flat_attributes = flatten_attrs(self.attributes)
flat_attributes.update(flatten_attrs(new_attributes))
self.attribute_lookup = flat_attributes
self.attributes = expand_attrs(flat_attributes)
@ -630,11 +649,52 @@ class CognitoIdpBackend(BaseBackend):
raise ResourceNotFoundError(user_pool_id)
if message_action and message_action == "RESEND":
if username not in user_pool.users:
if not user_pool._get_user(username):
raise UserNotFoundError(username)
elif username in user_pool.users:
elif user_pool._get_user(username):
raise UsernameExistsException(username)
# UsernameAttributes are attributes (either `email` or `phone_number`
# or both) than can be used in the place of a unique username. If the
# user provides an email or phone number when signing up, the user pool
# performs the following steps:
# 1. populates the correct field (email, phone_number) with the value
# supplied for Username
# 2. generates a persistent GUID for the user that will be returned as
# the value of `Username` in the `get-user` and `list-users`
# operations, as well as the value of `sub` in `IdToken` and
# `AccessToken`
#
# ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings
if user_pool.extended_config.get("UsernameAttributes"):
username_attributes = user_pool.extended_config["UsernameAttributes"]
# attribute_type should be one of `email`, `phone_number` or both
for attribute_type in username_attributes:
# check if provided username matches one of the attribute types in
# `UsernameAttributes`
if attribute_type in username_attributes and validate_username_format(
username, _format=attribute_type
):
# insert provided username into new user's attributes under the
# correct key
flattened_attrs = flatten_attrs(attributes or {})
flattened_attrs.update({attribute_type: username})
attributes = expand_attrs(flattened_attrs)
# set username to None so that it will be default to the internal GUID
# when them user gets created
username = None
# once the username has been validated against a username attribute
# type, there is no need to attempt validation against the other
# type(s)
break
# The provided username has not matched the required format for any
# of the possible attributes
if username is not None:
raise InvalidParameterException(
"Username should be either an email or a phone number."
)
user = CognitoIdpUser(
user_pool_id,
username,
@ -651,16 +711,16 @@ class CognitoIdpBackend(BaseBackend):
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if username not in user_pool.users:
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
return user_pool.users[username]
return user
def get_user(self, access_token):
for user_pool in self.user_pools.values():
if access_token in user_pool.access_tokens:
_, username = user_pool.access_tokens[access_token]
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if (
not user
or not user.enabled
@ -691,14 +751,15 @@ class CognitoIdpBackend(BaseBackend):
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if username not in user_pool.users:
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
user = user_pool.users[username]
for group in user.groups:
group.users.remove(user)
del user_pool.users[username]
# use internal username
del user_pool.users[user.username]
def _log_user_in(self, user_pool, client, username):
refresh_token = user_pool.create_refresh_token(client.id, username)
@ -727,7 +788,7 @@ class CognitoIdpBackend(BaseBackend):
if auth_flow in ("ADMIN_USER_PASSWORD_AUTH", "ADMIN_NO_SRP_AUTH"):
username = auth_parameters.get("USERNAME")
password = auth_parameters.get("PASSWORD")
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -783,7 +844,7 @@ class CognitoIdpBackend(BaseBackend):
if challenge_name == "NEW_PASSWORD_REQUIRED":
username = challenge_responses.get("USERNAME")
new_password = challenge_responses.get("NEW_PASSWORD")
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -794,7 +855,7 @@ class CognitoIdpBackend(BaseBackend):
return self._log_user_in(user_pool, client, username)
elif challenge_name == "PASSWORD_VERIFIER":
username = challenge_responses.get("USERNAME")
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -830,7 +891,7 @@ class CognitoIdpBackend(BaseBackend):
return self._log_user_in(user_pool, client, username)
elif challenge_name == "SOFTWARE_TOKEN_MFA":
username = challenge_responses.get("USERNAME")
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -853,8 +914,8 @@ class CognitoIdpBackend(BaseBackend):
def confirm_forgot_password(self, client_id, username, password):
for user_pool in self.user_pools.values():
if client_id in user_pool.clients and username in user_pool.users:
user_pool.users[username].password = password
if client_id in user_pool.clients and user_pool._get_user(username):
user_pool._get_user(username).password = password
break
else:
raise ResourceNotFoundError(client_id)
@ -863,7 +924,7 @@ class CognitoIdpBackend(BaseBackend):
for user_pool in self.user_pools.values():
if access_token in user_pool.access_tokens:
_, username = user_pool.access_tokens[access_token]
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -886,10 +947,10 @@ class CognitoIdpBackend(BaseBackend):
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if username not in user_pool.users:
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
user = user_pool.users[username]
user.update_attributes(attributes)
def admin_user_global_sign_out(self, user_pool_id, username):
@ -897,7 +958,8 @@ class CognitoIdpBackend(BaseBackend):
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if username not in user_pool.users:
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
for token, token_tuple in list(user_pool.refresh_tokens.items()):
@ -926,9 +988,50 @@ class CognitoIdpBackend(BaseBackend):
user_pool = p
if user_pool is None:
raise ResourceNotFoundError(client_id)
elif username in user_pool.users:
elif user_pool._get_user(username):
raise UsernameExistsException(username)
# UsernameAttributes are attributes (either `email` or `phone_number`
# or both) than can be used in the place of a unique username. If the
# user provides an email or phone number when signing up, the user pool
# performs the following steps:
# 1. populates the correct field (email, phone_number) with the value
# supplied for Username
# 2. generates a persistent GUID for the user that will be returned as
# the value of `Username` in the `get-user` and `list-users`
# operations, as well as the value of `sub` in `IdToken` and
# `AccessToken`
#
# ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings
if user_pool.extended_config.get("UsernameAttributes"):
username_attributes = user_pool.extended_config["UsernameAttributes"]
# attribute_type should be one of `email`, `phone_number` or both
for attribute_type in username_attributes:
# check if provided username matches one of the attribute types in
# `UsernameAttributes`
if attribute_type in username_attributes and validate_username_format(
username, _format=attribute_type
):
# insert provided username into new user's attributes under the
# correct key
flattened_attrs = flatten_attrs(attributes or {})
flattened_attrs.update({attribute_type: username})
attributes = expand_attrs(flattened_attrs)
# set username to None so that it will be default to the internal GUID
# when them user gets created
username = None
# once the username has been validated against a username attribute
# type, there is no need to attempt validation against the other
# type(s)
break
# The provided username has not matched the required format for any
# of the possible attributes
if username is not None:
raise InvalidParameterException(
"Username should be either an email or a phone number."
)
user = CognitoIdpUser(
user_pool_id=user_pool.id,
username=username,
@ -947,10 +1050,10 @@ class CognitoIdpBackend(BaseBackend):
if user_pool is None:
raise ResourceNotFoundError(client_id)
if username not in user_pool.users:
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
user = user_pool.users[username]
user.status = UserStatus["CONFIRMED"]
return ""
@ -976,7 +1079,7 @@ class CognitoIdpBackend(BaseBackend):
):
raise NotAuthorizedError(secret_hash)
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -1001,7 +1104,7 @@ class CognitoIdpBackend(BaseBackend):
username = auth_parameters.get("USERNAME")
password = auth_parameters.get("PASSWORD")
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -1069,7 +1172,7 @@ class CognitoIdpBackend(BaseBackend):
for user_pool in self.user_pools.values():
if access_token in user_pool.access_tokens:
_, username = user_pool.access_tokens[access_token]
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -1081,7 +1184,7 @@ class CognitoIdpBackend(BaseBackend):
for user_pool in self.user_pools.values():
if access_token in user_pool.access_tokens:
_, username = user_pool.access_tokens[access_token]
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)
@ -1097,7 +1200,7 @@ class CognitoIdpBackend(BaseBackend):
for user_pool in self.user_pools.values():
if access_token in user_pool.access_tokens:
_, username = user_pool.access_tokens[access_token]
user = user_pool.users.get(username)
user = user_pool._get_user(username)
if not user:
raise UserNotFoundError(username)

View File

@ -3,6 +3,12 @@ import string
import hashlib
import hmac
import base64
import re
FORMATS = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone_number": r"\+\d{,15}",
}
PAGINATION_MODEL = {
@ -45,3 +51,18 @@ def check_secret_hash(app_client_secret, app_client_id, username, secret_hash):
new_digest = hmac.new(key, msg, hashlib.sha256).digest()
SECRET_HASH = base64.b64encode(new_digest).decode()
return SECRET_HASH == secret_hash
def validate_username_format(username, _format="email"):
# if the value of the `_format` param other than `email` or `phone_number`,
# the default value for the regex will match nothing and the
# method will return None
return re.fullmatch(FORMATS.get(_format, r"a^"), username)
def flatten_attrs(attrs):
return {attr["Name"]: attr["Value"] for attr in attrs}
def expand_attrs(attrs):
return [{"Name": k, "Value": v} for k, v in attrs.items()]

View File

@ -913,6 +913,25 @@ def test_admin_add_user_to_group():
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
@mock_cognitoidp
def test_admin_add_user_to_group_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = "test@example.com"
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
result = conn.admin_add_user_to_group(
UserPoolId=user_pool_id, Username=username, GroupName=group_name
)
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
@mock_cognitoidp
def test_admin_add_user_to_group_again_is_noop():
conn = boto3.client("cognito-idp", "us-west-2")
@ -930,6 +949,7 @@ def test_admin_add_user_to_group_again_is_noop():
conn.admin_add_user_to_group(
UserPoolId=user_pool_id, Username=username, GroupName=group_name
)
# should there be an assertion here?
@mock_cognitoidp
@ -1001,6 +1021,29 @@ def test_admin_list_groups_for_user():
result["Groups"][0]["GroupName"].should.equal(group_name)
@mock_cognitoidp
def test_admin_list_groups_for_user_with_username_attribute():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = "test@example.com"
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(
UserPoolId=user_pool_id, Username=username, GroupName=group_name
)
result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id)
result["Groups"].should.have.length_of(1)
result["Groups"][0]["GroupName"].should.equal(group_name)
@mock_cognitoidp
def test_admin_list_groups_for_user_ignores_deleted_group():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1055,6 +1098,35 @@ def test_admin_remove_user_from_group():
].should.have.length_of(0)
@mock_cognitoidp
def test_admin_remove_user_from_group_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = "test@example.com"
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(
UserPoolId=user_pool_id, Username=username, GroupName=group_name
)
result = conn.admin_remove_user_from_group(
UserPoolId=user_pool_id, Username=username, GroupName=group_name
)
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name)[
"Users"
].should.have.length_of(0)
conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id)[
"Groups"
].should.have.length_of(0)
@mock_cognitoidp
def test_admin_remove_user_from_group_again_is_noop():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1089,7 +1161,7 @@ def test_admin_create_user():
result["User"]["Username"].should.equal(username)
result["User"]["UserStatus"].should.equal("FORCE_CHANGE_PASSWORD")
result["User"]["Attributes"].should.have.length_of(1)
result["User"]["Attributes"].should.have.length_of(2)
def _verify_attribute(name, v):
attr = [a for a in result["User"]["Attributes"] if a["Name"] == name]
@ -1100,6 +1172,84 @@ def test_admin_create_user():
result["User"]["Enabled"].should.equal(True)
@mock_cognitoidp
def test_admin_create_user_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
value = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
result = conn.admin_create_user(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[{"Name": "thing", "Value": value}],
)
result["User"]["Username"].should_not.equal(username)
result["User"]["UserStatus"].should.equal("FORCE_CHANGE_PASSWORD")
result["User"]["Attributes"].should.have.length_of(3)
def _verify_attribute(name, v):
attr = [a for a in result["User"]["Attributes"] if a["Name"] == name]
attr.should.have.length_of(1)
attr[0]["Value"].should.equal(v)
_verify_attribute("thing", value)
_verify_attribute("email", username)
result["User"]["Enabled"].should.equal(True)
@mock_cognitoidp
def test_admin_create_user_with_incorrect_username_attribute_type_fails():
conn = boto3.client("cognito-idp", "us-west-2")
value = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
with pytest.raises(ClientError) as ex:
username = str(uuid.uuid4())
conn.admin_create_user(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[{"Name": "thing", "Value": value}],
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterException")
err["Message"].should.equal("Username should be either an email or a phone number.")
@mock_cognitoidp
def test_admin_create_user_with_existing_username_attribute_fails():
conn = boto3.client("cognito-idp", "us-west-2")
value = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
username = "test@example.com"
conn.admin_create_user(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[{"Name": "thing", "Value": value}],
)
with pytest.raises(ClientError) as ex:
username = "test@example.com"
conn.admin_create_user(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[{"Name": "thing", "Value": value}],
)
err = ex.value.response["Error"]
err["Code"].should.equal("UsernameExistsException")
err["Message"].should.equal("test@example.com")
@mock_cognitoidp
def test_admin_create_existing_user():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1190,7 +1340,60 @@ def test_admin_get_user():
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["Username"].should.equal(username)
result["UserAttributes"].should.have.length_of(1)
result["UserAttributes"].should.have.length_of(2)
@mock_cognitoidp
def test_admin_get_user_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
value = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email", "phone_number"]
)["UserPool"]["Id"]
conn.admin_create_user(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[
{"Name": "thing", "Value": value},
{"Name": "phone_number", "Value": "+123456789"},
],
)
# verify user can be queried by email
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["Username"].should_not.equal(username)
result["UserAttributes"].should.have.length_of(4)
def _verify_attribute(name, v):
attr = [a for a in result["UserAttributes"] if a["Name"] == name]
attr.should.have.length_of(1)
attr[0]["Value"].should.equal(v)
_verify_attribute("phone_number", "+123456789")
_verify_attribute("email", "test@example.com")
# verify user can be queried by phone number
result = conn.admin_get_user(UserPoolId=user_pool_id, Username="+123456789")
result["Username"].should_not.equal(username)
result["UserAttributes"].should.have.length_of(4)
_verify_attribute("phone_number", "+123456789")
_verify_attribute("email", "test@example.com")
# verify that the generate user sub is a valid UUID v4
[user_sub] = [
attr["Value"] for attr in result["UserAttributes"] if attr["Name"] == "sub"
]
uuid.UUID(user_sub)
# verify user should be queried by user sub
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=user_sub)
result["Username"].should_not.equal(username)
result["UserAttributes"].should.have.length_of(4)
_verify_attribute("phone_number", "+123456789")
_verify_attribute("email", "test@example.com")
@mock_cognitoidp
@ -1209,13 +1412,29 @@ def test_admin_get_missing_user():
caught.should.be.true
@mock_cognitoidp
def test_admin_get_missing_user_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
with pytest.raises(ClientError) as ex:
conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
err = ex.value.response["Error"]
err["Code"].should.equal("UserNotFoundException")
@mock_cognitoidp
def test_get_user():
conn = boto3.client("cognito-idp", "us-west-2")
outputs = authentication_flow(conn, "ADMIN_NO_SRP_AUTH")
result = conn.get_user(AccessToken=outputs["access_token"])
result["Username"].should.equal(outputs["username"])
result["UserAttributes"].should.have.length_of(1)
result["UserAttributes"].should.have.length_of(2)
def _verify_attribute(name, v):
attr = [a for a in result["UserAttributes"] if a["Name"] == name]
@ -1317,6 +1536,50 @@ def test_list_users_invalid_attributes():
assert err["Message"].should.equal("Invalid search attribute: custom:foo")
@mock_cognitoidp
def test_list_users_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
result = conn.list_users(UserPoolId=user_pool_id)
result["Users"].should.have.length_of(1)
result["Users"][0]["Username"].should_not.equal(username)
def _verify_attribute(name, v):
attr = [a for a in result["Users"][0]["Attributes"] if a["Name"] == name]
attr.should.have.length_of(1)
attr[0]["Value"].should.equal(v)
_verify_attribute("email", username)
username_bis = "test2@uexample.com"
conn.admin_create_user(
UserPoolId=user_pool_id,
Username=username_bis,
UserAttributes=[{"Name": "phone_number", "Value": "+33666666666"}],
)
result = conn.list_users(
UserPoolId=user_pool_id, Filter='phone_number="+33666666666"'
)
result["Users"].should.have.length_of(1)
result["Users"][0]["Username"].should_not.equal(username_bis)
uuid.UUID(result["Users"][0]["Username"])
_verify_attribute("email", username_bis)
# checking Filter with space
result = conn.list_users(
UserPoolId=user_pool_id, Filter='phone_number = "+33666666666"'
)
result["Users"].should.have.length_of(1)
result["Users"][0]["Username"].should_not.equal(username_bis)
_verify_attribute("email", username_bis)
@mock_cognitoidp
def test_list_users_inherent_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1442,6 +1705,24 @@ def test_admin_disable_user():
].should.equal(False)
@mock_cognitoidp
def test_admin_disable_user_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
result = conn.admin_disable_user(UserPoolId=user_pool_id, Username=username)
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
conn.admin_get_user(UserPoolId=user_pool_id, Username=username)[
"Enabled"
].should.equal(False)
@mock_cognitoidp
def test_admin_enable_user():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1459,6 +1740,25 @@ def test_admin_enable_user():
].should.equal(True)
@mock_cognitoidp
def test_admin_enable_user_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_disable_user(UserPoolId=user_pool_id, Username=username)
result = conn.admin_enable_user(UserPoolId=user_pool_id, Username=username)
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
conn.admin_get_user(UserPoolId=user_pool_id, Username=username)[
"Enabled"
].should.equal(True)
@mock_cognitoidp
def test_admin_delete_user():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1477,6 +1777,24 @@ def test_admin_delete_user():
caught.should.be.true
@mock_cognitoidp
def test_admin_delete_user_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_delete_user(UserPoolId=user_pool_id, Username=username)
with pytest.raises(ClientError) as ex:
conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
err = ex.value.response["Error"]
err["Code"].should.equal("UserNotFoundException")
def authentication_flow(conn, auth_flow):
username = str(uuid.uuid4())
temporary_password = str(uuid.uuid4())
@ -1927,6 +2245,34 @@ def test_sign_up():
result["UserSub"].should_not.be.none
@mock_cognitoidp
def test_sign_up_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email", "phone_number"]
)["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()),
)["UserPoolClient"]["ClientId"]
username = str(uuid.uuid4())
password = str(uuid.uuid4())
with pytest.raises(ClientError) as err:
# Attempt to add user again
result = conn.sign_up(ClientId=client_id, Username=username, Password=password)
err.value.response["Error"]["Code"].should.equal("InvalidParameterException")
username = "test@example.com"
result = conn.sign_up(ClientId=client_id, Username=username, Password=password)
result["UserConfirmed"].should.be.false
result["UserSub"].should_not.be.none
username = "+123456789"
result = conn.sign_up(ClientId=client_id, Username=username, Password=password)
result["UserConfirmed"].should.be.false
result["UserSub"].should_not.be.none
@mock_cognitoidp
def test_sign_up_existing_user():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1966,6 +2312,27 @@ def test_confirm_sign_up():
result["UserStatus"].should.equal("CONFIRMED")
@mock_cognitoidp
def test_confirm_sign_up_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
password = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
)["UserPoolClient"]["ClientId"]
conn.sign_up(ClientId=client_id, Username=username, Password=password)
conn.confirm_sign_up(
ClientId=client_id, Username=username, ConfirmationCode="123456",
)
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["UserStatus"].should.equal("CONFIRMED")
@mock_cognitoidp
def test_initiate_auth_USER_SRP_AUTH():
conn = boto3.client("cognito-idp", "us-west-2")
@ -2001,6 +2368,43 @@ def test_initiate_auth_USER_SRP_AUTH():
result["ChallengeName"].should.equal("PASSWORD_VERIFIER")
@mock_cognitoidp
def test_initiate_auth_USER_SRP_AUTH_with_username_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
username = "test@example.com"
password = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
)["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
)["UserPoolClient"]["ClientId"]
conn.sign_up(ClientId=client_id, Username=username, Password=password)
client_secret = conn.describe_user_pool_client(
UserPoolId=user_pool_id, ClientId=client_id,
)["UserPoolClient"]["ClientSecret"]
conn.confirm_sign_up(
ClientId=client_id, Username=username, ConfirmationCode="123456",
)
key = bytes(str(client_secret).encode("latin-1"))
msg = bytes(str(username + client_id).encode("latin-1"))
new_digest = hmac.new(key, msg, hashlib.sha256).digest()
secret_hash = base64.b64encode(new_digest).decode()
result = conn.initiate_auth(
ClientId=client_id,
AuthFlow="USER_SRP_AUTH",
AuthParameters={
"USERNAME": username,
"SRP_A": uuid.uuid4().hex,
"SECRET_HASH": secret_hash,
},
)
result["ChallengeName"].should.equal("PASSWORD_VERIFIER")
@mock_cognitoidp
def test_initiate_auth_REFRESH_TOKEN():
conn = boto3.client("cognito-idp", "us-west-2")
@ -2260,7 +2664,7 @@ def test_admin_set_user_password():
)
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["Username"].should.equal(username)
result["UserAttributes"].should.have.length_of(1)
result["UserAttributes"].should.have.length_of(2)
def _verify_attribute(name, v):
attr = [a for a in result["UserAttributes"] if a["Name"] == name]