From 230e34748fab26b58b5ce4311fa5b2f93f6f1da1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz?= <38426907+nluk@users.noreply.github.com> Date: Thu, 14 Oct 2021 12:12:08 +0200 Subject: [PATCH] Added admin_reset_user_password implementation (#4412) --- moto/cognitoidp/models.py | 42 +++++-- moto/cognitoidp/responses.py | 8 ++ tests/test_cognitoidp/test_cognitoidp.py | 150 +++++++++++++++++++++++ 3 files changed, 193 insertions(+), 7 deletions(-) diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index f42d8177e..9f69ba56f 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -27,6 +27,7 @@ UserStatus = { "FORCE_CHANGE_PASSWORD": "FORCE_CHANGE_PASSWORD", "CONFIRMED": "CONFIRMED", "UNCONFIRMED": "UNCONFIRMED", + "RESET_REQUIRED": "RESET_REQUIRED", } @@ -267,6 +268,7 @@ class CognitoIdpUser(BaseModel): self.status = status self.enabled = True self.attributes = attributes + self.attribute_lookup = self._flatten_attributes(attributes) self.create_date = datetime.datetime.utcnow() self.last_modified_date = datetime.datetime.utcnow() self.sms_mfa_enabled = False @@ -306,15 +308,16 @@ class CognitoIdpUser(BaseModel): return user_json - def update_attributes(self, new_attributes): - def flatten_attrs(attrs): - return {attr["Name"]: attr["Value"] for attr in attrs} + def _flatten_attributes(self, attributes): + return {attr["Name"]: attr["Value"] for attr in attributes} + def update_attributes(self, new_attributes): def expand_attrs(attrs): return [{"Name": k, "Value": v} for k, v in attrs.items()] - flat_attributes = flatten_attrs(self.attributes) - flat_attributes.update(flatten_attrs(new_attributes)) + flat_attributes = self._flatten_attributes(self.attributes) + flat_attributes.update(self._flatten_attributes(new_attributes)) + self.attribute_lookup = flat_attributes self.attributes = expand_attrs(flat_attributes) @@ -597,6 +600,25 @@ class CognitoIdpBackend(BaseBackend): group.users.discard(user) user.groups.discard(group) + def admin_reset_user_password(self, user_pool_id, username): + user = self.admin_get_user(user_pool_id, username) + if not user.enabled: + raise NotAuthorizedError("User is disabled") + if user.status == UserStatus["RESET_REQUIRED"]: + return + if user.status != UserStatus["CONFIRMED"]: + raise NotAuthorizedError( + "User password cannot be reset in the current state." + ) + if ( + user.attribute_lookup.get("email_verified", "false") == "false" + and user.attribute_lookup.get("phone_number_verified", "false") == "false" + ): + raise InvalidParameterException( + "Cannot reset password for the user as there is no registered/verified email or phone_number" + ) + user.status = UserStatus["RESET_REQUIRED"] + # User def admin_create_user( self, user_pool_id, username, message_action, temporary_password, attributes @@ -710,7 +732,10 @@ class CognitoIdpBackend(BaseBackend): if user.password != password: raise NotAuthorizedError(username) - if user.status == UserStatus["FORCE_CHANGE_PASSWORD"]: + if user.status in [ + UserStatus["FORCE_CHANGE_PASSWORD"], + UserStatus["RESET_REQUIRED"], + ]: session = str(uuid.uuid4()) self.sessions[session] = user_pool @@ -844,7 +869,10 @@ class CognitoIdpBackend(BaseBackend): raise NotAuthorizedError(username) user.password = proposed_password - if user.status == UserStatus["FORCE_CHANGE_PASSWORD"]: + if user.status in [ + UserStatus["FORCE_CHANGE_PASSWORD"], + UserStatus["RESET_REQUIRED"], + ]: user.status = UserStatus["CONFIRMED"] break diff --git a/moto/cognitoidp/responses.py b/moto/cognitoidp/responses.py index 2e4154ae4..282de42eb 100644 --- a/moto/cognitoidp/responses.py +++ b/moto/cognitoidp/responses.py @@ -292,6 +292,14 @@ class CognitoIdpResponse(BaseResponse): return "" + def admin_reset_user_password(self): + user_pool_id = self._get_param("UserPoolId") + username = self._get_param("Username") + cognitoidp_backends[self.region].admin_reset_user_password( + user_pool_id, username + ) + return "" + # User def admin_create_user(self): user_pool_id = self._get_param("UserPoolId") diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 0e8311978..6c038a000 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -2253,6 +2253,156 @@ def test_confirm_forgot_password_with_non_existent_client_id_raises_error(): ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException") +@mock_cognitoidp +def test_admin_reset_password_and_change_password(): + client = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + temporary_pass = str(uuid.uuid4()) + # Create pool and client + user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = client.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + # Create CONFIRMED user with verified email + client.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=temporary_pass + ) + client.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456" + ) + client.admin_update_user_attributes( + UserPoolId=user_pool_id, + Username=username, + UserAttributes=[{"Name": "email_verified", "Value": "true"}], + ) + + # User should be in RESET_REQUIRED state after reset + client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username) + result = client.admin_get_user(UserPoolId=user_pool_id, Username=username) + result["UserStatus"].should.equal("RESET_REQUIRED") + + # Return to CONFIRMED status after NEW_PASSWORD_REQUIRED auth challenge + auth_result = client.admin_initiate_auth( + UserPoolId=user_pool_id, + ClientId=client_id, + AuthFlow="ADMIN_NO_SRP_AUTH", + AuthParameters={"USERNAME": username, "PASSWORD": temporary_pass}, + ) + password = "Admin123!" + auth_result = client.respond_to_auth_challenge( + Session=auth_result["Session"], + ClientId=client_id, + ChallengeName="NEW_PASSWORD_REQUIRED", + ChallengeResponses={"USERNAME": username, "NEW_PASSWORD": password}, + ) + result = client.admin_get_user(UserPoolId=user_pool_id, Username=username) + result["UserStatus"].should.equal("CONFIRMED") + + # Return to CONFIRMED after user-initated password change + client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username) + client.change_password( + AccessToken=auth_result["AuthenticationResult"]["AccessToken"], + PreviousPassword=password, + ProposedPassword="Admin1234!", + ) + result = client.admin_get_user(UserPoolId=user_pool_id, Username=username) + result["UserStatus"].should.equal("CONFIRMED") + + +@mock_cognitoidp +def test_admin_reset_password_disabled_user(): + client = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + # Create pool + user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + # Create disabled user + client.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) + ) + client.admin_disable_user(UserPoolId=user_pool_id, Username=username) + + with pytest.raises(ClientError) as ex: + client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username) + err = ex.value.response["Error"] + err["Code"].should.equal("NotAuthorizedException") + err["Message"].should.equal("User is disabled") + + +@mock_cognitoidp +def test_admin_reset_password_unconfirmed_user(): + client = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + # Create pool + user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + # Create user in status FORCE_CHANGE_PASSWORD + client.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) + ) + + with pytest.raises(ClientError) as ex: + client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username) + err = ex.value.response["Error"] + err["Code"].should.equal("NotAuthorizedException") + err["Message"].should.equal("User password cannot be reset in the current state.") + + +@mock_cognitoidp +def test_admin_reset_password_no_verified_notification_channel(): + client = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + # Create pool and client + user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = client.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + # Create CONFIRMED user without verified email or phone + client.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) + ) + client.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456" + ) + + with pytest.raises(ClientError) as ex: + client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username) + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidParameterException") + err["Message"].should.equal( + "Cannot reset password for the user as there is no registered/verified email or phone_number" + ) + + +@mock_cognitoidp +def test_admin_reset_password_multiple_invocations(): + client = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + # Create pool and client + user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = client.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + # Create CONFIRMED user with verified email + client.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) + ) + client.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456" + ) + client.admin_update_user_attributes( + UserPoolId=user_pool_id, + Username=username, + UserAttributes=[{"Name": "email_verified", "Value": "true"}], + ) + + for _ in range(3): + try: + client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username) + user = client.admin_get_user(UserPoolId=user_pool_id, Username=username) + user["UserStatus"].should.equal("RESET_REQUIRED") + except ClientError: + pytest.fail("Shouldn't throw error on consecutive invocations") + + # Test will retrieve public key from cognito.amazonaws.com/.well-known/jwks.json, # which isnt mocked in ServerMode if not settings.TEST_SERVER_MODE: