From 8dcf2d33edf1ac8d6a3889e406315cee15f7d02a Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Wed, 25 Jan 2023 09:20:03 -0100 Subject: [PATCH] CognitoIDP: Error when detecting a duplicate email (#5876) --- moto/cognitoidp/exceptions.py | 7 +++ moto/cognitoidp/models.py | 25 ++++++++++ .../test_cognitoidp_exceptions.py | 46 +++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/moto/cognitoidp/exceptions.py b/moto/cognitoidp/exceptions.py index b234a5238..8271af59e 100644 --- a/moto/cognitoidp/exceptions.py +++ b/moto/cognitoidp/exceptions.py @@ -2,6 +2,13 @@ from moto.core.exceptions import JsonRESTError from typing import Optional +class AliasExistsException(JsonRESTError): + def __init__(self) -> None: + super().__init__( + "AliasExistsException", "An account with the given email already exists." + ) + + class ResourceNotFoundError(JsonRESTError): def __init__(self, message: Optional[str]): super().__init__(error_type="ResourceNotFoundException", message=message or "") diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index 3ddb8f113..3c6e97c62 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -11,6 +11,7 @@ from typing import Any, Dict, List, Tuple, Optional, Set from moto.core import BaseBackend, BackendDict, BaseModel from moto.moto_api._internal import mock_random as random from .exceptions import ( + AliasExistsException, GroupExistsException, NotAuthorizedError, ResourceNotFoundError, @@ -1636,6 +1637,9 @@ class CognitoIdpBackend(BaseBackend): ) -> None: user = self.admin_get_user(user_pool_id, username) + email = self._find_attr("email", attributes) + self._verify_email_is_not_used(user_pool_id, email) + user.update_attributes(attributes) def admin_delete_user_attributes( @@ -2031,11 +2035,32 @@ class CognitoIdpBackend(BaseBackend): _, username = user_pool.access_tokens[access_token] user = self.admin_get_user(user_pool.id, username) + email = self._find_attr("email", attributes) + self._verify_email_is_not_used(user_pool.id, email) + user.update_attributes(attributes) return raise NotAuthorizedError(access_token) + def _find_attr(self, name: str, attrs: List[Dict[str, str]]) -> Optional[str]: + return next((a["Value"] for a in attrs if a["Name"] == name), None) + + def _verify_email_is_not_used( + self, user_pool_id: str, email: Optional[str] + ) -> None: + if not email: + # We're not updating emails + return + user_pool = self.describe_user_pool(user_pool_id) + if "email" not in user_pool.extended_config.get("UsernameAttributes", []): + # email is not used as a username - duplicate emails are allowed + return + + for user in user_pool.users.values(): + if user.attribute_lookup.get("email", "") == email: + raise AliasExistsException + class RegionAgnosticBackend: # Some operations are unauthenticated diff --git a/tests/test_cognitoidp/test_cognitoidp_exceptions.py b/tests/test_cognitoidp/test_cognitoidp_exceptions.py index ec19de0a0..0cd827bdc 100644 --- a/tests/test_cognitoidp/test_cognitoidp_exceptions.py +++ b/tests/test_cognitoidp/test_cognitoidp_exceptions.py @@ -1,6 +1,8 @@ from unittest import TestCase import boto3 +import pytest + from moto import mock_cognitoidp from botocore.exceptions import ClientError @@ -49,3 +51,47 @@ class TestCognitoUserDeleter(TestCase): }, ) exc.exception.response["Error"]["Code"].should.equal("NotAuthorizedException") + + +@mock_cognitoidp +class TestCognitoUserPoolDuplidateEmails(TestCase): + def setUp(self) -> None: + self.client = boto3.client("cognito-idp", "us-east-1") + + self.pool_id1 = self.client.create_user_pool(PoolName="test")["UserPool"]["Id"] + self.pool_id2 = self.client.create_user_pool( + PoolName="test", UsernameAttributes=["email"] + )["UserPool"]["Id"] + + # create two users + for user in ["user1", "user2"]: + self.client.admin_create_user( + UserPoolId=self.pool_id1, + Username=user, + UserAttributes=[{"Name": "email", "Value": f"{user}@test.com"}], + ) + self.client.admin_create_user( + UserPoolId=self.pool_id2, + Username=f"{user}@test.com", + UserAttributes=[{"Name": "email", "Value": f"{user}@test.com"}], + ) + + def test_use_existing_email__when_email_is_login(self): + with pytest.raises(ClientError) as exc: + self.client.admin_update_user_attributes( + UserPoolId=self.pool_id2, + Username="user1@test.com", + UserAttributes=[{"Name": "email", "Value": "user2@test.com"}], + ) + err = exc.value.response["Error"] + err["Code"].should.equal("AliasExistsException") + err["Message"].should.equal("An account with the given email already exists.") + + def test_use_existing_email__when_username_is_login(self): + # Because we cannot use the email as username, + # multiple users can have the same email address + self.client.admin_update_user_attributes( + UserPoolId=self.pool_id1, + Username="user1", + UserAttributes=[{"Name": "email", "Value": "user2@test.com"}], + )