Added significant verification to ForgotPassword, changed UserStatus dict to enum (#4469)

This commit is contained in:
Łukasz 2021-10-24 16:26:57 +02:00 committed by GitHub
parent a4a8949166
commit fee16cb388
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 211 additions and 30 deletions

View File

@ -4,6 +4,7 @@ import json
import os
import time
import uuid
import enum
from boto3 import Session
from jose import jws
from collections import OrderedDict
@ -28,12 +29,12 @@ from .utils import (
)
from moto.utilities.paginator import paginate
UserStatus = {
"FORCE_CHANGE_PASSWORD": "FORCE_CHANGE_PASSWORD",
"CONFIRMED": "CONFIRMED",
"UNCONFIRMED": "UNCONFIRMED",
"RESET_REQUIRED": "RESET_REQUIRED",
}
class UserStatus(str, enum.Enum):
FORCE_CHANGE_PASSWORD = "FORCE_CHANGE_PASSWORD"
CONFIRMED = "CONFIRMED"
UNCONFIRMED = "UNCONFIRMED"
RESET_REQUIRED = "RESET_REQUIRED"
class CognitoIdpUserPool(BaseModel):
@ -67,6 +68,19 @@ class CognitoIdpUserPool(BaseModel):
) as f:
self.json_web_key = json.loads(f.read())
def _account_recovery_setting(self):
# AccountRecoverySetting is not present in DescribeUserPool response if the pool was created without
# specifying it, ForgotPassword works on default settings nonetheless
return self.extended_config.get(
"AccountRecoverySetting",
{
"RecoveryMechanisms": [
{"Priority": 1, "Name": "verified_phone_number"},
{"Priority": 2, "Name": "verified_email"},
]
},
)
def _base_json(self):
return {
"Id": self.id,
@ -626,9 +640,9 @@ class CognitoIdpBackend(BaseBackend):
user = self.admin_get_user(user_pool_id, username)
if not user.enabled:
raise NotAuthorizedError("User is disabled")
if user.status == UserStatus["RESET_REQUIRED"]:
if user.status is UserStatus.RESET_REQUIRED:
return
if user.status != UserStatus["CONFIRMED"]:
if user.status is not UserStatus.CONFIRMED:
raise NotAuthorizedError(
"User password cannot be reset in the current state."
)
@ -639,7 +653,7 @@ class CognitoIdpBackend(BaseBackend):
raise InvalidParameterException(
"Cannot reset password for the user as there is no registered/verified email or phone_number"
)
user.status = UserStatus["RESET_REQUIRED"]
user.status = UserStatus.RESET_REQUIRED
# User
def admin_create_user(
@ -700,7 +714,7 @@ class CognitoIdpBackend(BaseBackend):
user_pool_id,
username,
temporary_password,
UserStatus["FORCE_CHANGE_PASSWORD"],
UserStatus.FORCE_CHANGE_PASSWORD,
attributes,
)
@ -737,7 +751,7 @@ class CognitoIdpBackend(BaseBackend):
if (
not user
or not user.enabled
or user.status != UserStatus["CONFIRMED"]
or user.status is not UserStatus.CONFIRMED
):
raise NotAuthorizedError("username")
return user
@ -809,8 +823,8 @@ class CognitoIdpBackend(BaseBackend):
raise NotAuthorizedError(username)
if user.status in [
UserStatus["FORCE_CHANGE_PASSWORD"],
UserStatus["RESET_REQUIRED"],
UserStatus.FORCE_CHANGE_PASSWORD,
UserStatus.RESET_REQUIRED,
]:
session = str(uuid.uuid4())
self.sessions[session] = user_pool
@ -862,7 +876,7 @@ class CognitoIdpBackend(BaseBackend):
raise UserNotFoundError(username)
user.password = new_password
user.status = UserStatus["CONFIRMED"]
user.status = UserStatus.CONFIRMED
del self.sessions[session]
return self._log_user_in(user_pool, client, username)
@ -933,6 +947,45 @@ class CognitoIdpBackend(BaseBackend):
else:
raise ResourceNotFoundError(client_id)
def forgot_password(self, client_id, username):
"""The ForgotPassword operation is partially broken in AWS. If the input is 100% correct it works fine.
Otherwise you get semi-random garbage and HTTP 200 OK, for example:
- recovery for username which is not registered in any cognito pool
- recovery for username belonging to a different user pool than the client id is registered to
- phone-based recovery for a user without phone_number / phone_number_verified attributes
- same as above, but email / email_verified
"""
for user_pool in self.user_pools.values():
if client_id in user_pool.clients:
recovery_settings = user_pool._account_recovery_setting()
user = user_pool._get_user(username)
break
else:
raise ResourceNotFoundError("Username/client id combination not found.")
code_delivery_details = {
"Destination": username + "@h***.com"
if not user
else user.attribute_lookup.get("email", username + "@h***.com"),
"DeliveryMedium": "EMAIL",
"AttributeName": "email",
}
selected_recovery = min(
recovery_settings["RecoveryMechanisms"],
key=lambda recovery_mechanism: recovery_mechanism["Priority"],
)
if selected_recovery["Name"] == "admin_only":
raise NotAuthorizedError("Contact administrator to reset password.")
if selected_recovery["Name"] == "verified_phone_number":
code_delivery_details = {
"Destination": "+*******9934"
if not user
else user.attribute_lookup.get("phone_number", "+*******9934"),
"DeliveryMedium": "SMS",
"AttributeName": "phone_number",
}
return {"CodeDeliveryDetails": code_delivery_details}
def change_password(self, access_token, previous_password, proposed_password):
for user_pool in self.user_pools.values():
if access_token in user_pool.access_tokens:
@ -946,10 +999,10 @@ class CognitoIdpBackend(BaseBackend):
user.password = proposed_password
if user.status in [
UserStatus["FORCE_CHANGE_PASSWORD"],
UserStatus["RESET_REQUIRED"],
UserStatus.FORCE_CHANGE_PASSWORD,
UserStatus.RESET_REQUIRED,
]:
user.status = UserStatus["CONFIRMED"]
user.status = UserStatus.CONFIRMED
break
else:
@ -1050,7 +1103,7 @@ class CognitoIdpBackend(BaseBackend):
username=username,
password=password,
attributes=attributes,
status=UserStatus["UNCONFIRMED"],
status=UserStatus.UNCONFIRMED,
)
user_pool.users[user.username] = user
return user
@ -1067,7 +1120,7 @@ class CognitoIdpBackend(BaseBackend):
if not user:
raise UserNotFoundError(username)
user.status = UserStatus["CONFIRMED"]
user.status = UserStatus.CONFIRMED
return ""
def initiate_auth(self, client_id, auth_flow, auth_parameters):
@ -1096,7 +1149,7 @@ class CognitoIdpBackend(BaseBackend):
if not user:
raise UserNotFoundError(username)
if user.status == UserStatus["UNCONFIRMED"]:
if user.status is UserStatus.UNCONFIRMED:
raise UserNotConfirmedException("User is not confirmed.")
session = str(uuid.uuid4())
@ -1125,7 +1178,7 @@ class CognitoIdpBackend(BaseBackend):
if user.password != password:
raise NotAuthorizedError("Incorrect username or password.")
if user.status == UserStatus["UNCONFIRMED"]:
if user.status is UserStatus.UNCONFIRMED:
raise UserNotConfirmedException("User is not confirmed.")
session = str(uuid.uuid4())
@ -1236,9 +1289,9 @@ class CognitoIdpBackend(BaseBackend):
user = self.admin_get_user(user_pool_id, username)
user.password = password
if permanent:
user.status = UserStatus["CONFIRMED"]
user.status = UserStatus.CONFIRMED
else:
user.status = UserStatus["FORCE_CHANGE_PASSWORD"]
user.status = UserStatus.FORCE_CHANGE_PASSWORD
cognitoidp_backends = {}

View File

@ -432,9 +432,11 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps(auth_result)
def forgot_password(self):
return json.dumps(
{"CodeDeliveryDetails": {"DeliveryMedium": "EMAIL", "Destination": "..."}}
)
client_id = self._get_param("ClientId")
username = self._get_param("Username")
region = find_region_by_value("client_id", client_id)
response = cognitoidp_backends[region].forgot_password(client_id, username)
return json.dumps(response)
# This endpoint receives no authorization header, so if moto-server is listening
# on localhost (doesn't get a region in the host header), it doesn't know what

View File

@ -207,12 +207,21 @@ def test_describe_user_pool():
name = str(uuid.uuid4())
value = str(uuid.uuid4())
user_pool_details = conn.create_user_pool(
PoolName=name, LambdaConfig={"PreSignUp": value}
PoolName=name,
LambdaConfig={"PreSignUp": value},
AccountRecoverySetting={
"RecoveryMechanisms": [{"Name": "verified_email", "Priority": 1}]
},
)
result = conn.describe_user_pool(UserPoolId=user_pool_details["UserPool"]["Id"])
result["UserPool"]["Name"].should.equal(name)
result["UserPool"]["LambdaConfig"]["PreSignUp"].should.equal(value)
result["UserPool"]["AccountRecoverySetting"]["RecoveryMechanisms"][0][
"Name"
].should.equal("verified_email")
result["UserPool"]["AccountRecoverySetting"]["RecoveryMechanisms"][0][
"Priority"
].should.equal(1)
@mock_cognitoidp
@ -2153,9 +2162,126 @@ def test_change_password__using_custom_user_agent_header():
@mock_cognitoidp
def test_forgot_password():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"]
result = conn.forgot_password(ClientId=client_id, Username=str(uuid.uuid4()))
result = conn.forgot_password(ClientId=create_id(), Username=str(uuid.uuid4()))
result["CodeDeliveryDetails"].should_not.be.none
result["CodeDeliveryDetails"]["Destination"].should.not_be.none
result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("SMS")
result["CodeDeliveryDetails"]["AttributeName"].should.equal("phone_number")
@mock_cognitoidp
def test_forgot_password_nonexistent_client_id():
conn = boto3.client("cognito-idp", "us-west-2")
with pytest.raises(ClientError) as ex:
conn.forgot_password(ClientId=create_id(), Username=str(uuid.uuid4()))
err = ex.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal("Username/client id combination not found.")
@mock_cognitoidp
def test_forgot_password_admin_only_recovery():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()),
AccountRecoverySetting={
"RecoveryMechanisms": [{"Name": "admin_only", "Priority": 1}]
},
)["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"]
with pytest.raises(ClientError) as ex:
conn.forgot_password(ClientId=client_id, Username=str(uuid.uuid4()))
err = ex.value.response["Error"]
err["Code"].should.equal("NotAuthorizedException")
err["Message"].should.equal("Contact administrator to reset password.")
@mock_cognitoidp
def test_forgot_password_user_with_all_recovery_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()),
AccountRecoverySetting={
"RecoveryMechanisms": [{"Name": "verified_email", "Priority": 1}]
},
)["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"]
username = str(uuid.uuid4())
conn.admin_create_user(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[
{"Name": "email", "Value": "test@moto.com"},
{"Name": "phone_number", "Value": "555555555"},
],
)
result = conn.forgot_password(ClientId=client_id, Username=username)
result["CodeDeliveryDetails"]["Destination"].should.equal("test@moto.com")
result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("EMAIL")
result["CodeDeliveryDetails"]["AttributeName"].should.equal("email")
conn.update_user_pool(
UserPoolId=user_pool_id,
AccountRecoverySetting={
"RecoveryMechanisms": [{"Name": "verified_phone_number", "Priority": 1}]
},
)
result = conn.forgot_password(ClientId=client_id, Username=username)
result["CodeDeliveryDetails"]["Destination"].should.equal("555555555")
result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("SMS")
result["CodeDeliveryDetails"]["AttributeName"].should.equal("phone_number")
@mock_cognitoidp
def test_forgot_password_nonexistent_user_or_user_without_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()),
AccountRecoverySetting={
"RecoveryMechanisms": [{"Name": "verified_email", "Priority": 1}]
},
)["UserPool"]["Id"]
client_id = conn.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
)["UserPoolClient"]["ClientId"]
user_without_attributes = str(uuid.uuid4())
nonexistent_user = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=user_without_attributes)
for user in user_without_attributes, nonexistent_user:
result = conn.forgot_password(ClientId=client_id, Username=user)
result["CodeDeliveryDetails"]["Destination"].should.equal(user + "@h***.com")
result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("EMAIL")
result["CodeDeliveryDetails"]["AttributeName"].should.equal("email")
conn.update_user_pool(
UserPoolId=user_pool_id,
AccountRecoverySetting={
"RecoveryMechanisms": [{"Name": "verified_phone_number", "Priority": 1}]
},
)
for user in user_without_attributes, nonexistent_user:
result = conn.forgot_password(ClientId=client_id, Username=user)
result["CodeDeliveryDetails"]["Destination"].should.equal("+*******9934")
result["CodeDeliveryDetails"]["DeliveryMedium"].should.equal("SMS")
result["CodeDeliveryDetails"]["AttributeName"].should.equal("phone_number")
@mock_cognitoidp