From 98ca9b82e122a1549d290264c574b12287fe1a8a Mon Sep 17 00:00:00 2001 From: Maksymilian Babarowski Date: Wed, 27 Oct 2021 12:43:37 +0200 Subject: [PATCH] =?UTF-8?q?cognito-idp=20=E2=80=93=20Correct=20exception?= =?UTF-8?q?=20when=20user=20does=20not=20exist=20(#4482)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- moto/cognitoidp/models.py | 72 ++++++------------------ tests/test_cognitoidp/test_cognitoidp.py | 33 +++++------ 2 files changed, 33 insertions(+), 72 deletions(-) diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index b99db3b66..5cb0ed92d 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -632,8 +632,7 @@ class CognitoIdpBackend(BaseBackend): user_pool = self.describe_user_pool(user_pool_id) if message_action and message_action == "RESEND": - if not user_pool._get_user(username): - raise UserNotFoundError(username) + self.admin_get_user(user_pool_id, username) elif user_pool._get_user(username): raise UsernameExistsException(username) @@ -690,12 +689,7 @@ class CognitoIdpBackend(BaseBackend): return user def admin_confirm_sign_up(self, user_pool_id, username): - user_pool = self.describe_user_pool(user_pool_id) - - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(f"User does not exist.") - + user = self.admin_get_user(user_pool_id, username) user.status = UserStatus["CONFIRMED"] return "" @@ -704,14 +698,14 @@ class CognitoIdpBackend(BaseBackend): user = user_pool._get_user(username) if not user: - raise UserNotFoundError(username) + raise UserNotFoundError("User does not exist.") return user def get_user(self, access_token): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool._get_user(username) + user = self.admin_get_user(user_pool.id, username) if ( not user or not user.enabled @@ -737,10 +731,7 @@ class CognitoIdpBackend(BaseBackend): def admin_delete_user(self, user_pool_id, username): user_pool = self.describe_user_pool(user_pool_id) - - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool_id, username) for group in user.groups: group.users.remove(user) @@ -773,9 +764,7 @@ class CognitoIdpBackend(BaseBackend): if auth_flow in ("ADMIN_USER_PASSWORD_AUTH", "ADMIN_NO_SRP_AUTH"): username = auth_parameters.get("USERNAME") password = auth_parameters.get("PASSWORD") - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool_id, username) if user.password != password: raise NotAuthorizedError(username) @@ -829,9 +818,7 @@ class CognitoIdpBackend(BaseBackend): if challenge_name == "NEW_PASSWORD_REQUIRED": username = challenge_responses.get("USERNAME") new_password = challenge_responses.get("NEW_PASSWORD") - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool.id, username) user.password = new_password user.status = UserStatus.CONFIRMED @@ -840,9 +827,7 @@ class CognitoIdpBackend(BaseBackend): return self._log_user_in(user_pool, client, username) elif challenge_name == "PASSWORD_VERIFIER": username = challenge_responses.get("USERNAME") - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool.id, username) password_claim_signature = challenge_responses.get( "PASSWORD_CLAIM_SIGNATURE" @@ -876,9 +861,7 @@ class CognitoIdpBackend(BaseBackend): return self._log_user_in(user_pool, client, username) elif challenge_name == "SOFTWARE_TOKEN_MFA": username = challenge_responses.get("USERNAME") - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + self.admin_get_user(user_pool.id, username) software_token_mfa_code = challenge_responses.get("SOFTWARE_TOKEN_MFA_CODE") if not software_token_mfa_code: @@ -948,9 +931,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool.id, username) if user.password != previous_password: raise NotAuthorizedError(username) @@ -967,20 +948,13 @@ class CognitoIdpBackend(BaseBackend): raise NotAuthorizedError(access_token) def admin_update_user_attributes(self, user_pool_id, username, attributes): - user_pool = self.describe_user_pool(user_pool_id) - - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool_id, username) user.update_attributes(attributes) def admin_user_global_sign_out(self, user_pool_id, username): user_pool = self.describe_user_pool(user_pool_id) - - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + self.admin_get_user(user_pool_id, username) for token, token_tuple in list(user_pool.refresh_tokens.items()): _, username = token_tuple @@ -1068,9 +1042,7 @@ class CognitoIdpBackend(BaseBackend): if user_pool is None: raise ResourceNotFoundError(client_id) - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool.id, username) user.status = UserStatus.CONFIRMED return "" @@ -1097,9 +1069,7 @@ class CognitoIdpBackend(BaseBackend): ): raise NotAuthorizedError(secret_hash) - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool.id, username) if user.status is UserStatus.UNCONFIRMED: raise UserNotConfirmedException("User is not confirmed.") @@ -1122,7 +1092,7 @@ class CognitoIdpBackend(BaseBackend): username = auth_parameters.get("USERNAME") password = auth_parameters.get("PASSWORD") - user = user_pool._get_user(username) + user = self.admin_get_user(user_pool.id, username) if not user: raise UserNotFoundError(username) @@ -1190,9 +1160,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + self.admin_get_user(user_pool.id, username) return {"SecretCode": str(uuid.uuid4())} else: @@ -1202,9 +1170,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool.id, username) user.token_verified = True @@ -1218,9 +1184,7 @@ class CognitoIdpBackend(BaseBackend): for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] - user = user_pool._get_user(username) - if not user: - raise UserNotFoundError(username) + user = self.admin_get_user(user_pool.id, username) if software_token_mfa_settings["Enabled"]: if user.token_verified: diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index b312662cb..78be1b2a8 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -1389,18 +1389,16 @@ def test_admin_resend_invitation_missing_user(): value = str(uuid.uuid4()) user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] - caught = False - try: + with pytest.raises(ClientError) as exc: conn.admin_create_user( UserPoolId=user_pool_id, Username=username, UserAttributes=[{"Name": "thing", "Value": value}], MessageAction="RESEND", ) - except conn.exceptions.UserNotFoundException: - caught = True - - caught.should.be.true + err = exc.value.response["Error"] + err["Code"].should.equal("UserNotFoundException") + err["Message"].should.equal(f"User does not exist.") @mock_cognitoidp @@ -1481,13 +1479,12 @@ def test_admin_get_missing_user(): username = str(uuid.uuid4()) user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] - caught = False - try: + with pytest.raises(ClientError) as exc: conn.admin_get_user(UserPoolId=user_pool_id, Username=username) - except conn.exceptions.UserNotFoundException: - caught = True - caught.should.be.true + err = exc.value.response["Error"] + err["Code"].should.equal("UserNotFoundException") + err["Message"].should.equal(f"User does not exist.") @mock_cognitoidp @@ -1499,11 +1496,12 @@ def test_admin_get_missing_user_with_username_attributes(): PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] )["UserPool"]["Id"] - with pytest.raises(ClientError) as ex: + with pytest.raises(ClientError) as exc: conn.admin_get_user(UserPoolId=user_pool_id, Username=username) - err = ex.value.response["Error"] + err = exc.value.response["Error"] err["Code"].should.equal("UserNotFoundException") + err["Message"].should.equal(f"User does not exist.") @mock_cognitoidp @@ -1846,13 +1844,11 @@ def test_admin_delete_user(): conn.admin_create_user(UserPoolId=user_pool_id, Username=username) conn.admin_delete_user(UserPoolId=user_pool_id, Username=username) - caught = False - try: + with pytest.raises(ClientError) as exc: conn.admin_get_user(UserPoolId=user_pool_id, Username=username) - except conn.exceptions.UserNotFoundException: - caught = True - caught.should.be.true + err = exc.value.response["Error"] + err["Code"].should.equal("UserNotFoundException") @mock_cognitoidp @@ -2351,6 +2347,7 @@ def test_admin_user_global_sign_out_unknown_user(): ) err = ex.value.response["Error"] err["Code"].should.equal("UserNotFoundException") + err["Message"].should.equal("User does not exist.") @mock_cognitoidp