CognitoIDP: Error when detecting a duplicate email (#5876)

This commit is contained in:
Bert Blommers 2023-01-25 09:20:03 -01:00 committed by GitHub
parent 6d41ad72e0
commit 8dcf2d33ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 0 deletions

View File

@ -2,6 +2,13 @@ from moto.core.exceptions import JsonRESTError
from typing import Optional from typing import Optional
class AliasExistsException(JsonRESTError):
def __init__(self) -> None:
super().__init__(
"AliasExistsException", "An account with the given email already exists."
)
class ResourceNotFoundError(JsonRESTError): class ResourceNotFoundError(JsonRESTError):
def __init__(self, message: Optional[str]): def __init__(self, message: Optional[str]):
super().__init__(error_type="ResourceNotFoundException", message=message or "") super().__init__(error_type="ResourceNotFoundException", message=message or "")

View File

@ -11,6 +11,7 @@ from typing import Any, Dict, List, Tuple, Optional, Set
from moto.core import BaseBackend, BackendDict, BaseModel from moto.core import BaseBackend, BackendDict, BaseModel
from moto.moto_api._internal import mock_random as random from moto.moto_api._internal import mock_random as random
from .exceptions import ( from .exceptions import (
AliasExistsException,
GroupExistsException, GroupExistsException,
NotAuthorizedError, NotAuthorizedError,
ResourceNotFoundError, ResourceNotFoundError,
@ -1636,6 +1637,9 @@ class CognitoIdpBackend(BaseBackend):
) -> None: ) -> None:
user = self.admin_get_user(user_pool_id, username) user = self.admin_get_user(user_pool_id, username)
email = self._find_attr("email", attributes)
self._verify_email_is_not_used(user_pool_id, email)
user.update_attributes(attributes) user.update_attributes(attributes)
def admin_delete_user_attributes( def admin_delete_user_attributes(
@ -2031,11 +2035,32 @@ class CognitoIdpBackend(BaseBackend):
_, username = user_pool.access_tokens[access_token] _, username = user_pool.access_tokens[access_token]
user = self.admin_get_user(user_pool.id, username) user = self.admin_get_user(user_pool.id, username)
email = self._find_attr("email", attributes)
self._verify_email_is_not_used(user_pool.id, email)
user.update_attributes(attributes) user.update_attributes(attributes)
return return
raise NotAuthorizedError(access_token) raise NotAuthorizedError(access_token)
def _find_attr(self, name: str, attrs: List[Dict[str, str]]) -> Optional[str]:
return next((a["Value"] for a in attrs if a["Name"] == name), None)
def _verify_email_is_not_used(
self, user_pool_id: str, email: Optional[str]
) -> None:
if not email:
# We're not updating emails
return
user_pool = self.describe_user_pool(user_pool_id)
if "email" not in user_pool.extended_config.get("UsernameAttributes", []):
# email is not used as a username - duplicate emails are allowed
return
for user in user_pool.users.values():
if user.attribute_lookup.get("email", "") == email:
raise AliasExistsException
class RegionAgnosticBackend: class RegionAgnosticBackend:
# Some operations are unauthenticated # Some operations are unauthenticated

View File

@ -1,6 +1,8 @@
from unittest import TestCase from unittest import TestCase
import boto3 import boto3
import pytest
from moto import mock_cognitoidp from moto import mock_cognitoidp
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
@ -49,3 +51,47 @@ class TestCognitoUserDeleter(TestCase):
}, },
) )
exc.exception.response["Error"]["Code"].should.equal("NotAuthorizedException") exc.exception.response["Error"]["Code"].should.equal("NotAuthorizedException")
@mock_cognitoidp
class TestCognitoUserPoolDuplidateEmails(TestCase):
def setUp(self) -> None:
self.client = boto3.client("cognito-idp", "us-east-1")
self.pool_id1 = self.client.create_user_pool(PoolName="test")["UserPool"]["Id"]
self.pool_id2 = self.client.create_user_pool(
PoolName="test", UsernameAttributes=["email"]
)["UserPool"]["Id"]
# create two users
for user in ["user1", "user2"]:
self.client.admin_create_user(
UserPoolId=self.pool_id1,
Username=user,
UserAttributes=[{"Name": "email", "Value": f"{user}@test.com"}],
)
self.client.admin_create_user(
UserPoolId=self.pool_id2,
Username=f"{user}@test.com",
UserAttributes=[{"Name": "email", "Value": f"{user}@test.com"}],
)
def test_use_existing_email__when_email_is_login(self):
with pytest.raises(ClientError) as exc:
self.client.admin_update_user_attributes(
UserPoolId=self.pool_id2,
Username="user1@test.com",
UserAttributes=[{"Name": "email", "Value": "user2@test.com"}],
)
err = exc.value.response["Error"]
err["Code"].should.equal("AliasExistsException")
err["Message"].should.equal("An account with the given email already exists.")
def test_use_existing_email__when_username_is_login(self):
# Because we cannot use the email as username,
# multiple users can have the same email address
self.client.admin_update_user_attributes(
UserPoolId=self.pool_id1,
Username="user1",
UserAttributes=[{"Name": "email", "Value": "user2@test.com"}],
)