Return OTP from forgot password (#4478)

This commit is contained in:
Łukasz 2021-10-27 20:29:29 +02:00 committed by GitHub
parent f240de5482
commit ec7c2d6d5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 103 additions and 9 deletions

View File

@ -51,6 +51,14 @@ class UserNotConfirmedException(BadRequest):
) )
class ExpiredCodeException(BadRequest):
def __init__(self, message):
super(ExpiredCodeException, self).__init__()
self.description = json.dumps(
{"message": message, "__type": "ExpiredCodeException"}
)
class InvalidParameterException(JsonRESTError): class InvalidParameterException(JsonRESTError):
def __init__(self, msg=None): def __init__(self, msg=None):
self.code = 400 self.code = 400

View File

@ -5,6 +5,7 @@ import os
import time import time
import uuid import uuid
import enum import enum
import random
from boto3 import Session from boto3 import Session
from jose import jws from jose import jws
from collections import OrderedDict from collections import OrderedDict
@ -18,6 +19,7 @@ from .exceptions import (
UsernameExistsException, UsernameExistsException,
UserNotConfirmedException, UserNotConfirmedException,
InvalidParameterException, InvalidParameterException,
ExpiredCodeException,
) )
from .utils import ( from .utils import (
create_id, create_id,
@ -310,6 +312,7 @@ class CognitoIdpUser(BaseModel):
self.sms_mfa_enabled = False self.sms_mfa_enabled = False
self.software_token_mfa_enabled = False self.software_token_mfa_enabled = False
self.token_verified = False self.token_verified = False
self.confirmation_code = None
# Groups this user is a member of. # Groups this user is a member of.
# Note that these links are bidirectional. # Note that these links are bidirectional.
@ -880,10 +883,19 @@ class CognitoIdpBackend(BaseBackend):
else: else:
return {} return {}
def confirm_forgot_password(self, client_id, username, password): def confirm_forgot_password(self, client_id, username, password, confirmation_code):
for user_pool in self.user_pools.values(): for user_pool in self.user_pools.values():
if client_id in user_pool.clients and user_pool._get_user(username): if client_id in user_pool.clients and user_pool._get_user(username):
user_pool._get_user(username).password = password user = user_pool._get_user(username)
if (
confirmation_code.startswith("moto-confirmation-code:")
and user.confirmation_code != confirmation_code
):
raise ExpiredCodeException(
"Invalid code provided, please request a code again."
)
user.password = password
user.confirmation_code = None
break break
else: else:
raise ResourceNotFoundError(client_id) raise ResourceNotFoundError(client_id)
@ -904,6 +916,17 @@ class CognitoIdpBackend(BaseBackend):
else: else:
raise ResourceNotFoundError("Username/client id combination not found.") raise ResourceNotFoundError("Username/client id combination not found.")
confirmation_code = None
if user:
# An unfortunate bit of magic - confirmation_code is opt-in, as it's returned
# via a "x-moto-forgot-password-confirmation-code" http header, which is not the AWS way (should be SES, SNS, Cognito built-in email)
# Verification of user.confirmation_code vs received code will be performed only for codes
# beginning with 'moto-confirmation-code' prefix. All other codes are considered VALID.
confirmation_code = (
f"moto-confirmation-code:{random.randint(100_000, 999_999)}"
)
user.confirmation_code = confirmation_code
code_delivery_details = { code_delivery_details = {
"Destination": username + "@h***.com" "Destination": username + "@h***.com"
if not user if not user
@ -925,7 +948,7 @@ class CognitoIdpBackend(BaseBackend):
"DeliveryMedium": "SMS", "DeliveryMedium": "SMS",
"AttributeName": "phone_number", "AttributeName": "phone_number",
} }
return {"CodeDeliveryDetails": code_delivery_details} return confirmation_code, {"CodeDeliveryDetails": code_delivery_details}
def change_password(self, access_token, previous_password, proposed_password): def change_password(self, access_token, previous_password, proposed_password):
for user_pool in self.user_pools.values(): for user_pool in self.user_pools.values():

View File

@ -435,7 +435,12 @@ class CognitoIdpResponse(BaseResponse):
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
username = self._get_param("Username") username = self._get_param("Username")
region = find_region_by_value("client_id", client_id) region = find_region_by_value("client_id", client_id)
response = cognitoidp_backends[region].forgot_password(client_id, username) confirmation_code, response = cognitoidp_backends[region].forgot_password(
client_id, username
)
self.response_headers[
"x-moto-forgot-password-confirmation-code"
] = confirmation_code
return json.dumps(response) return json.dumps(response)
# This endpoint receives no authorization header, so if moto-server is listening # This endpoint receives no authorization header, so if moto-server is listening
@ -446,9 +451,10 @@ class CognitoIdpResponse(BaseResponse):
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
username = self._get_param("Username") username = self._get_param("Username")
password = self._get_param("Password") password = self._get_param("Password")
confirmation_code = self._get_param("ConfirmationCode")
region = find_region_by_value("client_id", client_id) region = find_region_by_value("client_id", client_id)
cognitoidp_backends[region].confirm_forgot_password( cognitoidp_backends[region].confirm_forgot_password(
client_id, username, password client_id, username, password, confirmation_code
) )
return "" return ""

View File

@ -2163,7 +2163,6 @@ def test_forgot_password():
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()) UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"] )["UserPoolClient"]["ClientId"]
result = conn.forgot_password(ClientId=client_id, Username=str(uuid.uuid4())) result = conn.forgot_password(ClientId=client_id, Username=str(uuid.uuid4()))
result["CodeDeliveryDetails"]["Destination"].should.not_be.none result["CodeDeliveryDetails"]["Destination"].should.not_be.none
result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("SMS") result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("SMS")
result["CodeDeliveryDetails"]["AttributeName"].should.equal("phone_number") result["CodeDeliveryDetails"]["AttributeName"].should.equal("phone_number")
@ -2281,7 +2280,7 @@ def test_forgot_password_nonexistent_user_or_user_without_attributes():
@mock_cognitoidp @mock_cognitoidp
def test_confirm_forgot_password(): def test_confirm_forgot_password_legacy():
conn = boto3.client("cognito-idp", "us-west-2") conn = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4()) username = str(uuid.uuid4())
@ -2289,18 +2288,76 @@ def test_confirm_forgot_password():
client_id = conn.create_user_pool_client( client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()) UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"] )["UserPoolClient"]["ClientId"]
conn.admin_create_user( conn.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4()) UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4())
) )
conn.confirm_forgot_password( # Random confirmation code - opt out of verification
conn.forgot_password(ClientId=client_id, Username=username)
res = conn.confirm_forgot_password(
ClientId=client_id, ClientId=client_id,
Username=username, Username=username,
ConfirmationCode=str(uuid.uuid4()), ConfirmationCode=str(uuid.uuid4()),
Password=str(uuid.uuid4()), Password=str(uuid.uuid4()),
) )
res["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
@mock_cognitoidp
def test_confirm_forgot_password_opt_in_verification():
conn = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"]
conn.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4())
)
res = conn.forgot_password(ClientId=client_id, Username=username)
confirmation_code = res["ResponseMetadata"]["HTTPHeaders"][
"x-moto-forgot-password-confirmation-code"
]
confirmation_code.should.match(r"moto-confirmation-code:[0-9]{6}", re.I)
res = conn.confirm_forgot_password(
ClientId=client_id,
Username=username,
ConfirmationCode=confirmation_code,
Password=str(uuid.uuid4()),
)
res["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
@mock_cognitoidp
def test_confirm_forgot_password_opt_in_verification_invalid_confirmation_code():
conn = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4())
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"]
conn.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4())
)
with pytest.raises(ClientError) as ex:
conn.confirm_forgot_password(
ClientId=client_id,
Username=username,
ConfirmationCode="moto-confirmation-code:123invalid",
Password=str(uuid.uuid4()),
)
err = ex.value.response["Error"]
err["Code"].should.equal("ExpiredCodeException")
err["Message"].should.equal("Invalid code provided, please request a code again.")
@mock_cognitoidp @mock_cognitoidp
def test_admin_user_global_sign_out(): def test_admin_user_global_sign_out():