diff --git a/moto/cognitoidp/exceptions.py b/moto/cognitoidp/exceptions.py index d6ecf462a..21f738b14 100644 --- a/moto/cognitoidp/exceptions.py +++ b/moto/cognitoidp/exceptions.py @@ -51,6 +51,14 @@ class UserNotConfirmedException(BadRequest): ) +class ExpiredCodeException(BadRequest): + def __init__(self, message): + super(ExpiredCodeException, self).__init__() + self.description = json.dumps( + {"message": message, "__type": "ExpiredCodeException"} + ) + + class InvalidParameterException(JsonRESTError): def __init__(self, msg=None): self.code = 400 diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index 5cb0ed92d..46b46c5a9 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -5,6 +5,7 @@ import os import time import uuid import enum +import random from boto3 import Session from jose import jws from collections import OrderedDict @@ -18,6 +19,7 @@ from .exceptions import ( UsernameExistsException, UserNotConfirmedException, InvalidParameterException, + ExpiredCodeException, ) from .utils import ( create_id, @@ -310,6 +312,7 @@ class CognitoIdpUser(BaseModel): self.sms_mfa_enabled = False self.software_token_mfa_enabled = False self.token_verified = False + self.confirmation_code = None # Groups this user is a member of. # Note that these links are bidirectional. @@ -880,10 +883,19 @@ class CognitoIdpBackend(BaseBackend): else: return {} - def confirm_forgot_password(self, client_id, username, password): + def confirm_forgot_password(self, client_id, username, password, confirmation_code): for user_pool in self.user_pools.values(): if client_id in user_pool.clients and user_pool._get_user(username): - user_pool._get_user(username).password = password + user = user_pool._get_user(username) + if ( + confirmation_code.startswith("moto-confirmation-code:") + and user.confirmation_code != confirmation_code + ): + raise ExpiredCodeException( + "Invalid code provided, please request a code again." + ) + user.password = password + user.confirmation_code = None break else: raise ResourceNotFoundError(client_id) @@ -904,6 +916,17 @@ class CognitoIdpBackend(BaseBackend): else: raise ResourceNotFoundError("Username/client id combination not found.") + confirmation_code = None + if user: + # An unfortunate bit of magic - confirmation_code is opt-in, as it's returned + # via a "x-moto-forgot-password-confirmation-code" http header, which is not the AWS way (should be SES, SNS, Cognito built-in email) + # Verification of user.confirmation_code vs received code will be performed only for codes + # beginning with 'moto-confirmation-code' prefix. All other codes are considered VALID. + confirmation_code = ( + f"moto-confirmation-code:{random.randint(100_000, 999_999)}" + ) + user.confirmation_code = confirmation_code + code_delivery_details = { "Destination": username + "@h***.com" if not user @@ -925,7 +948,7 @@ class CognitoIdpBackend(BaseBackend): "DeliveryMedium": "SMS", "AttributeName": "phone_number", } - return {"CodeDeliveryDetails": code_delivery_details} + return confirmation_code, {"CodeDeliveryDetails": code_delivery_details} def change_password(self, access_token, previous_password, proposed_password): for user_pool in self.user_pools.values(): diff --git a/moto/cognitoidp/responses.py b/moto/cognitoidp/responses.py index aa2433d28..9a6468847 100644 --- a/moto/cognitoidp/responses.py +++ b/moto/cognitoidp/responses.py @@ -435,7 +435,12 @@ class CognitoIdpResponse(BaseResponse): client_id = self._get_param("ClientId") username = self._get_param("Username") region = find_region_by_value("client_id", client_id) - response = cognitoidp_backends[region].forgot_password(client_id, username) + confirmation_code, response = cognitoidp_backends[region].forgot_password( + client_id, username + ) + self.response_headers[ + "x-moto-forgot-password-confirmation-code" + ] = confirmation_code return json.dumps(response) # This endpoint receives no authorization header, so if moto-server is listening @@ -446,9 +451,10 @@ class CognitoIdpResponse(BaseResponse): client_id = self._get_param("ClientId") username = self._get_param("Username") password = self._get_param("Password") + confirmation_code = self._get_param("ConfirmationCode") region = find_region_by_value("client_id", client_id) cognitoidp_backends[region].confirm_forgot_password( - client_id, username, password + client_id, username, password, confirmation_code ) return "" diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 78be1b2a8..3ba986174 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -2163,7 +2163,6 @@ def test_forgot_password(): UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()) )["UserPoolClient"]["ClientId"] result = conn.forgot_password(ClientId=client_id, Username=str(uuid.uuid4())) - result["CodeDeliveryDetails"]["Destination"].should.not_be.none result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("SMS") result["CodeDeliveryDetails"]["AttributeName"].should.equal("phone_number") @@ -2281,7 +2280,7 @@ def test_forgot_password_nonexistent_user_or_user_without_attributes(): @mock_cognitoidp -def test_confirm_forgot_password(): +def test_confirm_forgot_password_legacy(): conn = boto3.client("cognito-idp", "us-west-2") username = str(uuid.uuid4()) @@ -2289,18 +2288,76 @@ def test_confirm_forgot_password(): client_id = conn.create_user_pool_client( UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()) )["UserPoolClient"]["ClientId"] - conn.admin_create_user( UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) ) - conn.confirm_forgot_password( + # Random confirmation code - opt out of verification + conn.forgot_password(ClientId=client_id, Username=username) + res = conn.confirm_forgot_password( ClientId=client_id, Username=username, ConfirmationCode=str(uuid.uuid4()), Password=str(uuid.uuid4()), ) + res["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + + +@mock_cognitoidp +def test_confirm_forgot_password_opt_in_verification(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()) + )["UserPoolClient"]["ClientId"] + conn.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) + ) + + res = conn.forgot_password(ClientId=client_id, Username=username) + + confirmation_code = res["ResponseMetadata"]["HTTPHeaders"][ + "x-moto-forgot-password-confirmation-code" + ] + confirmation_code.should.match(r"moto-confirmation-code:[0-9]{6}", re.I) + + res = conn.confirm_forgot_password( + ClientId=client_id, + Username=username, + ConfirmationCode=confirmation_code, + Password=str(uuid.uuid4()), + ) + + res["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + + +@mock_cognitoidp +def test_confirm_forgot_password_opt_in_verification_invalid_confirmation_code(): + conn = boto3.client("cognito-idp", "us-west-2") + + username = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()) + )["UserPoolClient"]["ClientId"] + conn.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) + ) + + with pytest.raises(ClientError) as ex: + conn.confirm_forgot_password( + ClientId=client_id, + Username=username, + ConfirmationCode="moto-confirmation-code:123invalid", + Password=str(uuid.uuid4()), + ) + err = ex.value.response["Error"] + err["Code"].should.equal("ExpiredCodeException") + err["Message"].should.equal("Invalid code provided, please request a code again.") + @mock_cognitoidp def test_admin_user_global_sign_out():