diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index cb1a5cf44..c904f06ef 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -18,7 +18,14 @@ from .exceptions import ( UserNotConfirmedException, InvalidParameterException, ) -from .utils import create_id, check_secret_hash, PAGINATION_MODEL +from .utils import ( + create_id, + check_secret_hash, + validate_username_format, + flatten_attrs, + expand_attrs, + PAGINATION_MODEL, +) from moto.utilities.paginator import paginate UserStatus = { @@ -82,6 +89,20 @@ class CognitoIdpUserPool(BaseModel): return user_pool_json + def _get_user(self, username): + """Find a user within a user pool by Username or any UsernameAttributes + (`email` or `phone_number` or both)""" + if self.extended_config.get("UsernameAttributes"): + attribute_types = self.extended_config["UsernameAttributes"] + for user in self.users.values(): + if username in [ + flatten_attrs(user.attributes).get(attribute_type) + for attribute_type in attribute_types + ]: + return user + + return self.users.get(username) + def create_jwt( self, client_id, username, token_use, expires_in=60 * 60, extra_data={} ): @@ -90,12 +111,12 @@ class CognitoIdpUserPool(BaseModel): "iss": "https://cognito-idp.{}.amazonaws.com/{}".format( self.region, self.id ), - "sub": self.users[username].id, + "sub": self._get_user(username).id, "aud": client_id, "token_use": token_use, "auth_time": now, "exp": now + expires_in, - "email": self.users[username].username, + "email": flatten_attrs(self._get_user(username).attributes).get("email"), } payload.update(extra_data) headers = {"kid": "dummy"} # KID as present in jwks-public.json @@ -140,7 +161,7 @@ class CognitoIdpUserPool(BaseModel): attribute = list( filter( lambda f: f["Name"] == readable_field, - self.users.get(username).attributes, + self._get_user(username).attributes, ) ) if len(attribute) > 0: @@ -261,12 +282,14 @@ class CognitoIdpUser(BaseModel): def __init__(self, user_pool_id, username, password, status, attributes): self.id = str(uuid.uuid4()) self.user_pool_id = user_pool_id - self.username = username + # Username is None when users sign up with an email or phone_number, + # and should be given the value of the internal id generate (sub) + self.username = username if username else self.id self.password = password self.status = status self.enabled = True self.attributes = attributes - self.attribute_lookup = self._flatten_attributes(attributes) + self.attribute_lookup = flatten_attrs(attributes) self.create_date = datetime.datetime.utcnow() self.last_modified_date = datetime.datetime.utcnow() self.sms_mfa_enabled = False @@ -277,6 +300,8 @@ class CognitoIdpUser(BaseModel): # Note that these links are bidirectional. self.groups = set() + self.update_attributes([{"Name": "sub", "Value": self.id}]) + def _base_json(self): return { "UserPoolId": self.user_pool_id, @@ -306,15 +331,9 @@ class CognitoIdpUser(BaseModel): return user_json - def _flatten_attributes(self, attributes): - return {attr["Name"]: attr["Value"] for attr in attributes} - def update_attributes(self, new_attributes): - def expand_attrs(attrs): - return [{"Name": k, "Value": v} for k, v in attrs.items()] - - flat_attributes = self._flatten_attributes(self.attributes) - flat_attributes.update(self._flatten_attributes(new_attributes)) + flat_attributes = flatten_attrs(self.attributes) + flat_attributes.update(flatten_attrs(new_attributes)) self.attribute_lookup = flat_attributes self.attributes = expand_attrs(flat_attributes) @@ -630,11 +649,52 @@ class CognitoIdpBackend(BaseBackend): raise ResourceNotFoundError(user_pool_id) if message_action and message_action == "RESEND": - if username not in user_pool.users: + if not user_pool._get_user(username): raise UserNotFoundError(username) - elif username in user_pool.users: + elif user_pool._get_user(username): raise UsernameExistsException(username) + # UsernameAttributes are attributes (either `email` or `phone_number` + # or both) than can be used in the place of a unique username. If the + # user provides an email or phone number when signing up, the user pool + # performs the following steps: + # 1. populates the correct field (email, phone_number) with the value + # supplied for Username + # 2. generates a persistent GUID for the user that will be returned as + # the value of `Username` in the `get-user` and `list-users` + # operations, as well as the value of `sub` in `IdToken` and + # `AccessToken` + # + # ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings + if user_pool.extended_config.get("UsernameAttributes"): + username_attributes = user_pool.extended_config["UsernameAttributes"] + # attribute_type should be one of `email`, `phone_number` or both + for attribute_type in username_attributes: + # check if provided username matches one of the attribute types in + # `UsernameAttributes` + if attribute_type in username_attributes and validate_username_format( + username, _format=attribute_type + ): + # insert provided username into new user's attributes under the + # correct key + flattened_attrs = flatten_attrs(attributes or {}) + flattened_attrs.update({attribute_type: username}) + attributes = expand_attrs(flattened_attrs) + # set username to None so that it will be default to the internal GUID + # when them user gets created + username = None + # once the username has been validated against a username attribute + # type, there is no need to attempt validation against the other + # type(s) + break + + # The provided username has not matched the required format for any + # of the possible attributes + if username is not None: + raise InvalidParameterException( + "Username should be either an email or a phone number." + ) + user = CognitoIdpUser( user_pool_id, username, @@ -651,16 +711,16 @@ class CognitoIdpBackend(BaseBackend): if not user_pool: raise ResourceNotFoundError(user_pool_id) - if username not in user_pool.users: + user = user_pool._get_user(username) + if not user: raise UserNotFoundError(username) - - return user_pool.users[username] + return user def get_user(self, access_token): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool.users.get(username) + user = user_pool._get_user(username) if ( not user or not user.enabled @@ -691,14 +751,15 @@ class CognitoIdpBackend(BaseBackend): if not user_pool: raise ResourceNotFoundError(user_pool_id) - if username not in user_pool.users: + user = user_pool._get_user(username) + if not user: raise UserNotFoundError(username) - user = user_pool.users[username] for group in user.groups: group.users.remove(user) - del user_pool.users[username] + # use internal username + del user_pool.users[user.username] def _log_user_in(self, user_pool, client, username): refresh_token = user_pool.create_refresh_token(client.id, username) @@ -727,7 +788,7 @@ class CognitoIdpBackend(BaseBackend): if auth_flow in ("ADMIN_USER_PASSWORD_AUTH", "ADMIN_NO_SRP_AUTH"): username = auth_parameters.get("USERNAME") password = auth_parameters.get("PASSWORD") - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -783,7 +844,7 @@ class CognitoIdpBackend(BaseBackend): if challenge_name == "NEW_PASSWORD_REQUIRED": username = challenge_responses.get("USERNAME") new_password = challenge_responses.get("NEW_PASSWORD") - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -794,7 +855,7 @@ class CognitoIdpBackend(BaseBackend): return self._log_user_in(user_pool, client, username) elif challenge_name == "PASSWORD_VERIFIER": username = challenge_responses.get("USERNAME") - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -830,7 +891,7 @@ class CognitoIdpBackend(BaseBackend): return self._log_user_in(user_pool, client, username) elif challenge_name == "SOFTWARE_TOKEN_MFA": username = challenge_responses.get("USERNAME") - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -853,8 +914,8 @@ class CognitoIdpBackend(BaseBackend): def confirm_forgot_password(self, client_id, username, password): for user_pool in self.user_pools.values(): - if client_id in user_pool.clients and username in user_pool.users: - user_pool.users[username].password = password + if client_id in user_pool.clients and user_pool._get_user(username): + user_pool._get_user(username).password = password break else: raise ResourceNotFoundError(client_id) @@ -863,7 +924,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -886,10 +947,10 @@ class CognitoIdpBackend(BaseBackend): if not user_pool: raise ResourceNotFoundError(user_pool_id) - if username not in user_pool.users: + user = user_pool._get_user(username) + if not user: raise UserNotFoundError(username) - user = user_pool.users[username] user.update_attributes(attributes) def admin_user_global_sign_out(self, user_pool_id, username): @@ -897,7 +958,8 @@ class CognitoIdpBackend(BaseBackend): if not user_pool: raise ResourceNotFoundError(user_pool_id) - if username not in user_pool.users: + user = user_pool._get_user(username) + if not user: raise UserNotFoundError(username) for token, token_tuple in list(user_pool.refresh_tokens.items()): @@ -926,9 +988,50 @@ class CognitoIdpBackend(BaseBackend): user_pool = p if user_pool is None: raise ResourceNotFoundError(client_id) - elif username in user_pool.users: + elif user_pool._get_user(username): raise UsernameExistsException(username) + # UsernameAttributes are attributes (either `email` or `phone_number` + # or both) than can be used in the place of a unique username. If the + # user provides an email or phone number when signing up, the user pool + # performs the following steps: + # 1. populates the correct field (email, phone_number) with the value + # supplied for Username + # 2. generates a persistent GUID for the user that will be returned as + # the value of `Username` in the `get-user` and `list-users` + # operations, as well as the value of `sub` in `IdToken` and + # `AccessToken` + # + # ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings + if user_pool.extended_config.get("UsernameAttributes"): + username_attributes = user_pool.extended_config["UsernameAttributes"] + # attribute_type should be one of `email`, `phone_number` or both + for attribute_type in username_attributes: + # check if provided username matches one of the attribute types in + # `UsernameAttributes` + if attribute_type in username_attributes and validate_username_format( + username, _format=attribute_type + ): + # insert provided username into new user's attributes under the + # correct key + flattened_attrs = flatten_attrs(attributes or {}) + flattened_attrs.update({attribute_type: username}) + attributes = expand_attrs(flattened_attrs) + # set username to None so that it will be default to the internal GUID + # when them user gets created + username = None + # once the username has been validated against a username attribute + # type, there is no need to attempt validation against the other + # type(s) + break + + # The provided username has not matched the required format for any + # of the possible attributes + if username is not None: + raise InvalidParameterException( + "Username should be either an email or a phone number." + ) + user = CognitoIdpUser( user_pool_id=user_pool.id, username=username, @@ -947,10 +1050,10 @@ class CognitoIdpBackend(BaseBackend): if user_pool is None: raise ResourceNotFoundError(client_id) - if username not in user_pool.users: + user = user_pool._get_user(username) + if not user: raise UserNotFoundError(username) - user = user_pool.users[username] user.status = UserStatus["CONFIRMED"] return "" @@ -976,7 +1079,7 @@ class CognitoIdpBackend(BaseBackend): ): raise NotAuthorizedError(secret_hash) - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -1001,7 +1104,7 @@ class CognitoIdpBackend(BaseBackend): username = auth_parameters.get("USERNAME") password = auth_parameters.get("PASSWORD") - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -1069,7 +1172,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -1081,7 +1184,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) @@ -1097,7 +1200,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool.users.get(username) + user = user_pool._get_user(username) if not user: raise UserNotFoundError(username) diff --git a/moto/cognitoidp/utils.py b/moto/cognitoidp/utils.py index 504dfbe5d..be00dbbbd 100644 --- a/moto/cognitoidp/utils.py +++ b/moto/cognitoidp/utils.py @@ -3,6 +3,12 @@ import string import hashlib import hmac import base64 +import re + +FORMATS = { + "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", + "phone_number": r"\+\d{,15}", +} PAGINATION_MODEL = { @@ -45,3 +51,18 @@ def check_secret_hash(app_client_secret, app_client_id, username, secret_hash): new_digest = hmac.new(key, msg, hashlib.sha256).digest() SECRET_HASH = base64.b64encode(new_digest).decode() return SECRET_HASH == secret_hash + + +def validate_username_format(username, _format="email"): + # if the value of the `_format` param other than `email` or `phone_number`, + # the default value for the regex will match nothing and the + # method will return None + return re.fullmatch(FORMATS.get(_format, r"a^"), username) + + +def flatten_attrs(attrs): + return {attr["Name"]: attr["Value"] for attr in attrs} + + +def expand_attrs(attrs): + return [{"Name": k, "Value": v} for k, v in attrs.items()] diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 7cb8e8956..be9c63a26 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -913,6 +913,25 @@ def test_admin_add_user_to_group(): list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected +@mock_cognitoidp +def test_admin_add_user_to_group_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = "test@example.com" + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + result = conn.admin_add_user_to_group( + UserPoolId=user_pool_id, Username=username, GroupName=group_name + ) + list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected + + @mock_cognitoidp def test_admin_add_user_to_group_again_is_noop(): conn = boto3.client("cognito-idp", "us-west-2") @@ -930,6 +949,7 @@ def test_admin_add_user_to_group_again_is_noop(): conn.admin_add_user_to_group( UserPoolId=user_pool_id, Username=username, GroupName=group_name ) + # should there be an assertion here? @mock_cognitoidp @@ -1001,6 +1021,29 @@ def test_admin_list_groups_for_user(): result["Groups"][0]["GroupName"].should.equal(group_name) +@mock_cognitoidp +def test_admin_list_groups_for_user_with_username_attribute(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = "test@example.com" + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group( + UserPoolId=user_pool_id, Username=username, GroupName=group_name + ) + + result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id) + + result["Groups"].should.have.length_of(1) + result["Groups"][0]["GroupName"].should.equal(group_name) + + @mock_cognitoidp def test_admin_list_groups_for_user_ignores_deleted_group(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1055,6 +1098,35 @@ def test_admin_remove_user_from_group(): ].should.have.length_of(0) +@mock_cognitoidp +def test_admin_remove_user_from_group_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = "test@example.com" + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group( + UserPoolId=user_pool_id, Username=username, GroupName=group_name + ) + + result = conn.admin_remove_user_from_group( + UserPoolId=user_pool_id, Username=username, GroupName=group_name + ) + list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected + conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name)[ + "Users" + ].should.have.length_of(0) + conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id)[ + "Groups" + ].should.have.length_of(0) + + @mock_cognitoidp def test_admin_remove_user_from_group_again_is_noop(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1089,7 +1161,7 @@ def test_admin_create_user(): result["User"]["Username"].should.equal(username) result["User"]["UserStatus"].should.equal("FORCE_CHANGE_PASSWORD") - result["User"]["Attributes"].should.have.length_of(1) + result["User"]["Attributes"].should.have.length_of(2) def _verify_attribute(name, v): attr = [a for a in result["User"]["Attributes"] if a["Name"] == name] @@ -1100,6 +1172,84 @@ def test_admin_create_user(): result["User"]["Enabled"].should.equal(True) +@mock_cognitoidp +def test_admin_create_user_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = "test@example.com" + value = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + result = conn.admin_create_user( + UserPoolId=user_pool_id, + Username=username, + UserAttributes=[{"Name": "thing", "Value": value}], + ) + + result["User"]["Username"].should_not.equal(username) + result["User"]["UserStatus"].should.equal("FORCE_CHANGE_PASSWORD") + result["User"]["Attributes"].should.have.length_of(3) + + def _verify_attribute(name, v): + attr = [a for a in result["User"]["Attributes"] if a["Name"] == name] + attr.should.have.length_of(1) + attr[0]["Value"].should.equal(v) + + _verify_attribute("thing", value) + _verify_attribute("email", username) + result["User"]["Enabled"].should.equal(True) + + +@mock_cognitoidp +def test_admin_create_user_with_incorrect_username_attribute_type_fails(): + conn = boto3.client("cognito-idp", "us-west-2") + + value = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + + with pytest.raises(ClientError) as ex: + username = str(uuid.uuid4()) + conn.admin_create_user( + UserPoolId=user_pool_id, + Username=username, + UserAttributes=[{"Name": "thing", "Value": value}], + ) + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidParameterException") + err["Message"].should.equal("Username should be either an email or a phone number.") + + +@mock_cognitoidp +def test_admin_create_user_with_existing_username_attribute_fails(): + conn = boto3.client("cognito-idp", "us-west-2") + + value = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + + username = "test@example.com" + conn.admin_create_user( + UserPoolId=user_pool_id, + Username=username, + UserAttributes=[{"Name": "thing", "Value": value}], + ) + + with pytest.raises(ClientError) as ex: + username = "test@example.com" + conn.admin_create_user( + UserPoolId=user_pool_id, + Username=username, + UserAttributes=[{"Name": "thing", "Value": value}], + ) + err = ex.value.response["Error"] + err["Code"].should.equal("UsernameExistsException") + err["Message"].should.equal("test@example.com") + + @mock_cognitoidp def test_admin_create_existing_user(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1190,7 +1340,60 @@ def test_admin_get_user(): result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) result["Username"].should.equal(username) - result["UserAttributes"].should.have.length_of(1) + result["UserAttributes"].should.have.length_of(2) + + +@mock_cognitoidp +def test_admin_get_user_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = "test@example.com" + value = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email", "phone_number"] + )["UserPool"]["Id"] + conn.admin_create_user( + UserPoolId=user_pool_id, + Username=username, + UserAttributes=[ + {"Name": "thing", "Value": value}, + {"Name": "phone_number", "Value": "+123456789"}, + ], + ) + # verify user can be queried by email + result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) + result["Username"].should_not.equal(username) + result["UserAttributes"].should.have.length_of(4) + + def _verify_attribute(name, v): + attr = [a for a in result["UserAttributes"] if a["Name"] == name] + attr.should.have.length_of(1) + attr[0]["Value"].should.equal(v) + + _verify_attribute("phone_number", "+123456789") + _verify_attribute("email", "test@example.com") + + # verify user can be queried by phone number + result = conn.admin_get_user(UserPoolId=user_pool_id, Username="+123456789") + + result["Username"].should_not.equal(username) + result["UserAttributes"].should.have.length_of(4) + _verify_attribute("phone_number", "+123456789") + _verify_attribute("email", "test@example.com") + + # verify that the generate user sub is a valid UUID v4 + [user_sub] = [ + attr["Value"] for attr in result["UserAttributes"] if attr["Name"] == "sub" + ] + uuid.UUID(user_sub) + + # verify user should be queried by user sub + result = conn.admin_get_user(UserPoolId=user_pool_id, Username=user_sub) + + result["Username"].should_not.equal(username) + result["UserAttributes"].should.have.length_of(4) + _verify_attribute("phone_number", "+123456789") + _verify_attribute("email", "test@example.com") @mock_cognitoidp @@ -1209,13 +1412,29 @@ def test_admin_get_missing_user(): caught.should.be.true +@mock_cognitoidp +def test_admin_get_missing_user_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = "test@example.com" + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + + with pytest.raises(ClientError) as ex: + conn.admin_get_user(UserPoolId=user_pool_id, Username=username) + + err = ex.value.response["Error"] + err["Code"].should.equal("UserNotFoundException") + + @mock_cognitoidp def test_get_user(): conn = boto3.client("cognito-idp", "us-west-2") outputs = authentication_flow(conn, "ADMIN_NO_SRP_AUTH") result = conn.get_user(AccessToken=outputs["access_token"]) result["Username"].should.equal(outputs["username"]) - result["UserAttributes"].should.have.length_of(1) + result["UserAttributes"].should.have.length_of(2) def _verify_attribute(name, v): attr = [a for a in result["UserAttributes"] if a["Name"] == name] @@ -1317,6 +1536,50 @@ def test_list_users_invalid_attributes(): assert err["Message"].should.equal("Invalid search attribute: custom:foo") +@mock_cognitoidp +def test_list_users_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = "test@example.com" + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + result = conn.list_users(UserPoolId=user_pool_id) + result["Users"].should.have.length_of(1) + result["Users"][0]["Username"].should_not.equal(username) + + def _verify_attribute(name, v): + attr = [a for a in result["Users"][0]["Attributes"] if a["Name"] == name] + attr.should.have.length_of(1) + attr[0]["Value"].should.equal(v) + + _verify_attribute("email", username) + + username_bis = "test2@uexample.com" + conn.admin_create_user( + UserPoolId=user_pool_id, + Username=username_bis, + UserAttributes=[{"Name": "phone_number", "Value": "+33666666666"}], + ) + result = conn.list_users( + UserPoolId=user_pool_id, Filter='phone_number="+33666666666"' + ) + result["Users"].should.have.length_of(1) + result["Users"][0]["Username"].should_not.equal(username_bis) + uuid.UUID(result["Users"][0]["Username"]) + + _verify_attribute("email", username_bis) + + # checking Filter with space + result = conn.list_users( + UserPoolId=user_pool_id, Filter='phone_number = "+33666666666"' + ) + result["Users"].should.have.length_of(1) + result["Users"][0]["Username"].should_not.equal(username_bis) + _verify_attribute("email", username_bis) + + @mock_cognitoidp def test_list_users_inherent_attributes(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1442,6 +1705,24 @@ def test_admin_disable_user(): ].should.equal(False) +@mock_cognitoidp +def test_admin_disable_user_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = "test@example.com" + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + result = conn.admin_disable_user(UserPoolId=user_pool_id, Username=username) + list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected + + conn.admin_get_user(UserPoolId=user_pool_id, Username=username)[ + "Enabled" + ].should.equal(False) + + @mock_cognitoidp def test_admin_enable_user(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1459,6 +1740,25 @@ def test_admin_enable_user(): ].should.equal(True) +@mock_cognitoidp +def test_admin_enable_user_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = "test@example.com" + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + conn.admin_disable_user(UserPoolId=user_pool_id, Username=username) + + result = conn.admin_enable_user(UserPoolId=user_pool_id, Username=username) + list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected + + conn.admin_get_user(UserPoolId=user_pool_id, Username=username)[ + "Enabled" + ].should.equal(True) + + @mock_cognitoidp def test_admin_delete_user(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1477,6 +1777,24 @@ def test_admin_delete_user(): caught.should.be.true +@mock_cognitoidp +def test_admin_delete_user_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = "test@example.com" + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + conn.admin_delete_user(UserPoolId=user_pool_id, Username=username) + + with pytest.raises(ClientError) as ex: + conn.admin_get_user(UserPoolId=user_pool_id, Username=username) + + err = ex.value.response["Error"] + err["Code"].should.equal("UserNotFoundException") + + def authentication_flow(conn, auth_flow): username = str(uuid.uuid4()) temporary_password = str(uuid.uuid4()) @@ -1927,6 +2245,34 @@ def test_sign_up(): result["UserSub"].should_not.be.none +@mock_cognitoidp +def test_sign_up_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email", "phone_number"] + )["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), + )["UserPoolClient"]["ClientId"] + username = str(uuid.uuid4()) + password = str(uuid.uuid4()) + with pytest.raises(ClientError) as err: + # Attempt to add user again + result = conn.sign_up(ClientId=client_id, Username=username, Password=password) + err.value.response["Error"]["Code"].should.equal("InvalidParameterException") + + username = "test@example.com" + result = conn.sign_up(ClientId=client_id, Username=username, Password=password) + + result["UserConfirmed"].should.be.false + result["UserSub"].should_not.be.none + username = "+123456789" + result = conn.sign_up(ClientId=client_id, Username=username, Password=password) + + result["UserConfirmed"].should.be.false + result["UserSub"].should_not.be.none + + @mock_cognitoidp def test_sign_up_existing_user(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1966,6 +2312,27 @@ def test_confirm_sign_up(): result["UserStatus"].should.equal("CONFIRMED") +@mock_cognitoidp +def test_confirm_sign_up_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + username = "test@example.com" + password = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + conn.sign_up(ClientId=client_id, Username=username, Password=password) + + conn.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456", + ) + + result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) + result["UserStatus"].should.equal("CONFIRMED") + + @mock_cognitoidp def test_initiate_auth_USER_SRP_AUTH(): conn = boto3.client("cognito-idp", "us-west-2") @@ -2001,6 +2368,43 @@ def test_initiate_auth_USER_SRP_AUTH(): result["ChallengeName"].should.equal("PASSWORD_VERIFIER") +@mock_cognitoidp +def test_initiate_auth_USER_SRP_AUTH_with_username_attributes(): + conn = boto3.client("cognito-idp", "us-west-2") + username = "test@example.com" + password = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool( + PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] + )["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + conn.sign_up(ClientId=client_id, Username=username, Password=password) + client_secret = conn.describe_user_pool_client( + UserPoolId=user_pool_id, ClientId=client_id, + )["UserPoolClient"]["ClientSecret"] + conn.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456", + ) + + key = bytes(str(client_secret).encode("latin-1")) + msg = bytes(str(username + client_id).encode("latin-1")) + new_digest = hmac.new(key, msg, hashlib.sha256).digest() + secret_hash = base64.b64encode(new_digest).decode() + + result = conn.initiate_auth( + ClientId=client_id, + AuthFlow="USER_SRP_AUTH", + AuthParameters={ + "USERNAME": username, + "SRP_A": uuid.uuid4().hex, + "SECRET_HASH": secret_hash, + }, + ) + + result["ChallengeName"].should.equal("PASSWORD_VERIFIER") + + @mock_cognitoidp def test_initiate_auth_REFRESH_TOKEN(): conn = boto3.client("cognito-idp", "us-west-2") @@ -2260,7 +2664,7 @@ def test_admin_set_user_password(): ) result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) result["Username"].should.equal(username) - result["UserAttributes"].should.have.length_of(1) + result["UserAttributes"].should.have.length_of(2) def _verify_attribute(name, v): attr = [a for a in result["UserAttributes"] if a["Name"] == name]