cognito-idp – Correct exception when user does not exist (#4482)
This commit is contained in:
		
							parent
							
								
									2f05bca27b
								
							
						
					
					
						commit
						98ca9b82e1
					
				| @ -632,8 +632,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         user_pool = self.describe_user_pool(user_pool_id) | ||||
| 
 | ||||
|         if message_action and message_action == "RESEND": | ||||
|             if not user_pool._get_user(username): | ||||
|                 raise UserNotFoundError(username) | ||||
|             self.admin_get_user(user_pool_id, username) | ||||
|         elif user_pool._get_user(username): | ||||
|             raise UsernameExistsException(username) | ||||
| 
 | ||||
| @ -690,12 +689,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         return user | ||||
| 
 | ||||
|     def admin_confirm_sign_up(self, user_pool_id, username): | ||||
|         user_pool = self.describe_user_pool(user_pool_id) | ||||
| 
 | ||||
|         user = user_pool._get_user(username) | ||||
|         if not user: | ||||
|             raise UserNotFoundError(f"User does not exist.") | ||||
| 
 | ||||
|         user = self.admin_get_user(user_pool_id, username) | ||||
|         user.status = UserStatus["CONFIRMED"] | ||||
|         return "" | ||||
| 
 | ||||
| @ -704,14 +698,14 @@ class CognitoIdpBackend(BaseBackend): | ||||
| 
 | ||||
|         user = user_pool._get_user(username) | ||||
|         if not user: | ||||
|             raise UserNotFoundError(username) | ||||
|             raise UserNotFoundError("User does not exist.") | ||||
|         return user | ||||
| 
 | ||||
|     def get_user(self, access_token): | ||||
|         for user_pool in self.user_pools.values(): | ||||
|             if access_token in user_pool.access_tokens: | ||||
|                 _, username = user_pool.access_tokens[access_token] | ||||
|                 user = user_pool._get_user(username) | ||||
|                 user = self.admin_get_user(user_pool.id, username) | ||||
|                 if ( | ||||
|                     not user | ||||
|                     or not user.enabled | ||||
| @ -737,10 +731,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
| 
 | ||||
|     def admin_delete_user(self, user_pool_id, username): | ||||
|         user_pool = self.describe_user_pool(user_pool_id) | ||||
| 
 | ||||
|         user = user_pool._get_user(username) | ||||
|         if not user: | ||||
|             raise UserNotFoundError(username) | ||||
|         user = self.admin_get_user(user_pool_id, username) | ||||
| 
 | ||||
|         for group in user.groups: | ||||
|             group.users.remove(user) | ||||
| @ -773,9 +764,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         if auth_flow in ("ADMIN_USER_PASSWORD_AUTH", "ADMIN_NO_SRP_AUTH"): | ||||
|             username = auth_parameters.get("USERNAME") | ||||
|             password = auth_parameters.get("PASSWORD") | ||||
|             user = user_pool._get_user(username) | ||||
|             if not user: | ||||
|                 raise UserNotFoundError(username) | ||||
|             user = self.admin_get_user(user_pool_id, username) | ||||
| 
 | ||||
|             if user.password != password: | ||||
|                 raise NotAuthorizedError(username) | ||||
| @ -829,9 +818,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         if challenge_name == "NEW_PASSWORD_REQUIRED": | ||||
|             username = challenge_responses.get("USERNAME") | ||||
|             new_password = challenge_responses.get("NEW_PASSWORD") | ||||
|             user = user_pool._get_user(username) | ||||
|             if not user: | ||||
|                 raise UserNotFoundError(username) | ||||
|             user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|             user.password = new_password | ||||
|             user.status = UserStatus.CONFIRMED | ||||
| @ -840,9 +827,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|             return self._log_user_in(user_pool, client, username) | ||||
|         elif challenge_name == "PASSWORD_VERIFIER": | ||||
|             username = challenge_responses.get("USERNAME") | ||||
|             user = user_pool._get_user(username) | ||||
|             if not user: | ||||
|                 raise UserNotFoundError(username) | ||||
|             user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|             password_claim_signature = challenge_responses.get( | ||||
|                 "PASSWORD_CLAIM_SIGNATURE" | ||||
| @ -876,9 +861,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|             return self._log_user_in(user_pool, client, username) | ||||
|         elif challenge_name == "SOFTWARE_TOKEN_MFA": | ||||
|             username = challenge_responses.get("USERNAME") | ||||
|             user = user_pool._get_user(username) | ||||
|             if not user: | ||||
|                 raise UserNotFoundError(username) | ||||
|             self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|             software_token_mfa_code = challenge_responses.get("SOFTWARE_TOKEN_MFA_CODE") | ||||
|             if not software_token_mfa_code: | ||||
| @ -948,9 +931,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         for user_pool in self.user_pools.values(): | ||||
|             if access_token in user_pool.access_tokens: | ||||
|                 _, username = user_pool.access_tokens[access_token] | ||||
|                 user = user_pool._get_user(username) | ||||
|                 if not user: | ||||
|                     raise UserNotFoundError(username) | ||||
|                 user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|                 if user.password != previous_password: | ||||
|                     raise NotAuthorizedError(username) | ||||
| @ -967,20 +948,13 @@ class CognitoIdpBackend(BaseBackend): | ||||
|             raise NotAuthorizedError(access_token) | ||||
| 
 | ||||
|     def admin_update_user_attributes(self, user_pool_id, username, attributes): | ||||
|         user_pool = self.describe_user_pool(user_pool_id) | ||||
| 
 | ||||
|         user = user_pool._get_user(username) | ||||
|         if not user: | ||||
|             raise UserNotFoundError(username) | ||||
|         user = self.admin_get_user(user_pool_id, username) | ||||
| 
 | ||||
|         user.update_attributes(attributes) | ||||
| 
 | ||||
|     def admin_user_global_sign_out(self, user_pool_id, username): | ||||
|         user_pool = self.describe_user_pool(user_pool_id) | ||||
| 
 | ||||
|         user = user_pool._get_user(username) | ||||
|         if not user: | ||||
|             raise UserNotFoundError(username) | ||||
|         self.admin_get_user(user_pool_id, username) | ||||
| 
 | ||||
|         for token, token_tuple in list(user_pool.refresh_tokens.items()): | ||||
|             _, username = token_tuple | ||||
| @ -1068,9 +1042,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         if user_pool is None: | ||||
|             raise ResourceNotFoundError(client_id) | ||||
| 
 | ||||
|         user = user_pool._get_user(username) | ||||
|         if not user: | ||||
|             raise UserNotFoundError(username) | ||||
|         user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|         user.status = UserStatus.CONFIRMED | ||||
|         return "" | ||||
| @ -1097,9 +1069,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|                 ): | ||||
|                     raise NotAuthorizedError(secret_hash) | ||||
| 
 | ||||
|             user = user_pool._get_user(username) | ||||
|             if not user: | ||||
|                 raise UserNotFoundError(username) | ||||
|             user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|             if user.status is UserStatus.UNCONFIRMED: | ||||
|                 raise UserNotConfirmedException("User is not confirmed.") | ||||
| @ -1122,7 +1092,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|             username = auth_parameters.get("USERNAME") | ||||
|             password = auth_parameters.get("PASSWORD") | ||||
| 
 | ||||
|             user = user_pool._get_user(username) | ||||
|             user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|             if not user: | ||||
|                 raise UserNotFoundError(username) | ||||
| @ -1190,9 +1160,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         for user_pool in self.user_pools.values(): | ||||
|             if access_token in user_pool.access_tokens: | ||||
|                 _, username = user_pool.access_tokens[access_token] | ||||
|                 user = user_pool._get_user(username) | ||||
|                 if not user: | ||||
|                     raise UserNotFoundError(username) | ||||
|                 self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|                 return {"SecretCode": str(uuid.uuid4())} | ||||
|         else: | ||||
| @ -1202,9 +1170,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         for user_pool in self.user_pools.values(): | ||||
|             if access_token in user_pool.access_tokens: | ||||
|                 _, username = user_pool.access_tokens[access_token] | ||||
|                 user = user_pool._get_user(username) | ||||
|                 if not user: | ||||
|                     raise UserNotFoundError(username) | ||||
|                 user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|                 user.token_verified = True | ||||
| 
 | ||||
| @ -1218,9 +1184,7 @@ class CognitoIdpBackend(BaseBackend): | ||||
|         for user_pool in self.user_pools.values(): | ||||
|             if access_token in user_pool.access_tokens: | ||||
|                 _, username = user_pool.access_tokens[access_token] | ||||
|                 user = user_pool._get_user(username) | ||||
|                 if not user: | ||||
|                     raise UserNotFoundError(username) | ||||
|                 user = self.admin_get_user(user_pool.id, username) | ||||
| 
 | ||||
|                 if software_token_mfa_settings["Enabled"]: | ||||
|                     if user.token_verified: | ||||
|  | ||||
| @ -1389,18 +1389,16 @@ def test_admin_resend_invitation_missing_user(): | ||||
|     value = str(uuid.uuid4()) | ||||
|     user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] | ||||
| 
 | ||||
|     caught = False | ||||
|     try: | ||||
|     with pytest.raises(ClientError) as exc: | ||||
|         conn.admin_create_user( | ||||
|             UserPoolId=user_pool_id, | ||||
|             Username=username, | ||||
|             UserAttributes=[{"Name": "thing", "Value": value}], | ||||
|             MessageAction="RESEND", | ||||
|         ) | ||||
|     except conn.exceptions.UserNotFoundException: | ||||
|         caught = True | ||||
| 
 | ||||
|     caught.should.be.true | ||||
|     err = exc.value.response["Error"] | ||||
|     err["Code"].should.equal("UserNotFoundException") | ||||
|     err["Message"].should.equal(f"User does not exist.") | ||||
| 
 | ||||
| 
 | ||||
| @mock_cognitoidp | ||||
| @ -1481,13 +1479,12 @@ def test_admin_get_missing_user(): | ||||
|     username = str(uuid.uuid4()) | ||||
|     user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] | ||||
| 
 | ||||
|     caught = False | ||||
|     try: | ||||
|     with pytest.raises(ClientError) as exc: | ||||
|         conn.admin_get_user(UserPoolId=user_pool_id, Username=username) | ||||
|     except conn.exceptions.UserNotFoundException: | ||||
|         caught = True | ||||
| 
 | ||||
|     caught.should.be.true | ||||
|     err = exc.value.response["Error"] | ||||
|     err["Code"].should.equal("UserNotFoundException") | ||||
|     err["Message"].should.equal(f"User does not exist.") | ||||
| 
 | ||||
| 
 | ||||
| @mock_cognitoidp | ||||
| @ -1499,11 +1496,12 @@ def test_admin_get_missing_user_with_username_attributes(): | ||||
|         PoolName=str(uuid.uuid4()), UsernameAttributes=["email"] | ||||
|     )["UserPool"]["Id"] | ||||
| 
 | ||||
|     with pytest.raises(ClientError) as ex: | ||||
|     with pytest.raises(ClientError) as exc: | ||||
|         conn.admin_get_user(UserPoolId=user_pool_id, Username=username) | ||||
| 
 | ||||
|     err = ex.value.response["Error"] | ||||
|     err = exc.value.response["Error"] | ||||
|     err["Code"].should.equal("UserNotFoundException") | ||||
|     err["Message"].should.equal(f"User does not exist.") | ||||
| 
 | ||||
| 
 | ||||
| @mock_cognitoidp | ||||
| @ -1846,13 +1844,11 @@ def test_admin_delete_user(): | ||||
|     conn.admin_create_user(UserPoolId=user_pool_id, Username=username) | ||||
|     conn.admin_delete_user(UserPoolId=user_pool_id, Username=username) | ||||
| 
 | ||||
|     caught = False | ||||
|     try: | ||||
|     with pytest.raises(ClientError) as exc: | ||||
|         conn.admin_get_user(UserPoolId=user_pool_id, Username=username) | ||||
|     except conn.exceptions.UserNotFoundException: | ||||
|         caught = True | ||||
| 
 | ||||
|     caught.should.be.true | ||||
|     err = exc.value.response["Error"] | ||||
|     err["Code"].should.equal("UserNotFoundException") | ||||
| 
 | ||||
| 
 | ||||
| @mock_cognitoidp | ||||
| @ -2351,6 +2347,7 @@ def test_admin_user_global_sign_out_unknown_user(): | ||||
|         ) | ||||
|     err = ex.value.response["Error"] | ||||
|     err["Code"].should.equal("UserNotFoundException") | ||||
|     err["Message"].should.equal("User does not exist.") | ||||
| 
 | ||||
| 
 | ||||
| @mock_cognitoidp | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user