Added admin_reset_user_password implementation (#4412)

This commit is contained in:
Łukasz 2021-10-14 12:12:08 +02:00 committed by GitHub
parent df1732c737
commit 230e34748f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 193 additions and 7 deletions

View File

@ -27,6 +27,7 @@ UserStatus = {
"FORCE_CHANGE_PASSWORD": "FORCE_CHANGE_PASSWORD",
"CONFIRMED": "CONFIRMED",
"UNCONFIRMED": "UNCONFIRMED",
"RESET_REQUIRED": "RESET_REQUIRED",
}
@ -267,6 +268,7 @@ class CognitoIdpUser(BaseModel):
self.status = status
self.enabled = True
self.attributes = attributes
self.attribute_lookup = self._flatten_attributes(attributes)
self.create_date = datetime.datetime.utcnow()
self.last_modified_date = datetime.datetime.utcnow()
self.sms_mfa_enabled = False
@ -306,15 +308,16 @@ class CognitoIdpUser(BaseModel):
return user_json
def update_attributes(self, new_attributes):
def flatten_attrs(attrs):
return {attr["Name"]: attr["Value"] for attr in attrs}
def _flatten_attributes(self, attributes):
return {attr["Name"]: attr["Value"] for attr in attributes}
def update_attributes(self, new_attributes):
def expand_attrs(attrs):
return [{"Name": k, "Value": v} for k, v in attrs.items()]
flat_attributes = flatten_attrs(self.attributes)
flat_attributes.update(flatten_attrs(new_attributes))
flat_attributes = self._flatten_attributes(self.attributes)
flat_attributes.update(self._flatten_attributes(new_attributes))
self.attribute_lookup = flat_attributes
self.attributes = expand_attrs(flat_attributes)
@ -597,6 +600,25 @@ class CognitoIdpBackend(BaseBackend):
group.users.discard(user)
user.groups.discard(group)
def admin_reset_user_password(self, user_pool_id, username):
user = self.admin_get_user(user_pool_id, username)
if not user.enabled:
raise NotAuthorizedError("User is disabled")
if user.status == UserStatus["RESET_REQUIRED"]:
return
if user.status != UserStatus["CONFIRMED"]:
raise NotAuthorizedError(
"User password cannot be reset in the current state."
)
if (
user.attribute_lookup.get("email_verified", "false") == "false"
and user.attribute_lookup.get("phone_number_verified", "false") == "false"
):
raise InvalidParameterException(
"Cannot reset password for the user as there is no registered/verified email or phone_number"
)
user.status = UserStatus["RESET_REQUIRED"]
# User
def admin_create_user(
self, user_pool_id, username, message_action, temporary_password, attributes
@ -710,7 +732,10 @@ class CognitoIdpBackend(BaseBackend):
if user.password != password:
raise NotAuthorizedError(username)
if user.status == UserStatus["FORCE_CHANGE_PASSWORD"]:
if user.status in [
UserStatus["FORCE_CHANGE_PASSWORD"],
UserStatus["RESET_REQUIRED"],
]:
session = str(uuid.uuid4())
self.sessions[session] = user_pool
@ -844,7 +869,10 @@ class CognitoIdpBackend(BaseBackend):
raise NotAuthorizedError(username)
user.password = proposed_password
if user.status == UserStatus["FORCE_CHANGE_PASSWORD"]:
if user.status in [
UserStatus["FORCE_CHANGE_PASSWORD"],
UserStatus["RESET_REQUIRED"],
]:
user.status = UserStatus["CONFIRMED"]
break

View File

@ -292,6 +292,14 @@ class CognitoIdpResponse(BaseResponse):
return ""
def admin_reset_user_password(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_reset_user_password(
user_pool_id, username
)
return ""
# User
def admin_create_user(self):
user_pool_id = self._get_param("UserPoolId")

View File

@ -2253,6 +2253,156 @@ def test_confirm_forgot_password_with_non_existent_client_id_raises_error():
ex.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
@mock_cognitoidp
def test_admin_reset_password_and_change_password():
client = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4())
temporary_pass = str(uuid.uuid4())
# Create pool and client
user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
client_id = client.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
)["UserPoolClient"]["ClientId"]
# Create CONFIRMED user with verified email
client.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=temporary_pass
)
client.confirm_sign_up(
ClientId=client_id, Username=username, ConfirmationCode="123456"
)
client.admin_update_user_attributes(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[{"Name": "email_verified", "Value": "true"}],
)
# User should be in RESET_REQUIRED state after reset
client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username)
result = client.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["UserStatus"].should.equal("RESET_REQUIRED")
# Return to CONFIRMED status after NEW_PASSWORD_REQUIRED auth challenge
auth_result = client.admin_initiate_auth(
UserPoolId=user_pool_id,
ClientId=client_id,
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={"USERNAME": username, "PASSWORD": temporary_pass},
)
password = "Admin123!"
auth_result = client.respond_to_auth_challenge(
Session=auth_result["Session"],
ClientId=client_id,
ChallengeName="NEW_PASSWORD_REQUIRED",
ChallengeResponses={"USERNAME": username, "NEW_PASSWORD": password},
)
result = client.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["UserStatus"].should.equal("CONFIRMED")
# Return to CONFIRMED after user-initated password change
client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username)
client.change_password(
AccessToken=auth_result["AuthenticationResult"]["AccessToken"],
PreviousPassword=password,
ProposedPassword="Admin1234!",
)
result = client.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["UserStatus"].should.equal("CONFIRMED")
@mock_cognitoidp
def test_admin_reset_password_disabled_user():
client = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4())
# Create pool
user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
# Create disabled user
client.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4())
)
client.admin_disable_user(UserPoolId=user_pool_id, Username=username)
with pytest.raises(ClientError) as ex:
client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username)
err = ex.value.response["Error"]
err["Code"].should.equal("NotAuthorizedException")
err["Message"].should.equal("User is disabled")
@mock_cognitoidp
def test_admin_reset_password_unconfirmed_user():
client = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4())
# Create pool
user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
# Create user in status FORCE_CHANGE_PASSWORD
client.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4())
)
with pytest.raises(ClientError) as ex:
client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username)
err = ex.value.response["Error"]
err["Code"].should.equal("NotAuthorizedException")
err["Message"].should.equal("User password cannot be reset in the current state.")
@mock_cognitoidp
def test_admin_reset_password_no_verified_notification_channel():
client = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4())
# Create pool and client
user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
client_id = client.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
)["UserPoolClient"]["ClientId"]
# Create CONFIRMED user without verified email or phone
client.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4())
)
client.confirm_sign_up(
ClientId=client_id, Username=username, ConfirmationCode="123456"
)
with pytest.raises(ClientError) as ex:
client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterException")
err["Message"].should.equal(
"Cannot reset password for the user as there is no registered/verified email or phone_number"
)
@mock_cognitoidp
def test_admin_reset_password_multiple_invocations():
client = boto3.client("cognito-idp", "us-west-2")
username = str(uuid.uuid4())
# Create pool and client
user_pool_id = client.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
client_id = client.create_user_pool_client(
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
)["UserPoolClient"]["ClientId"]
# Create CONFIRMED user with verified email
client.admin_create_user(
UserPoolId=user_pool_id, Username=username, TemporaryPassword=str(uuid.uuid4())
)
client.confirm_sign_up(
ClientId=client_id, Username=username, ConfirmationCode="123456"
)
client.admin_update_user_attributes(
UserPoolId=user_pool_id,
Username=username,
UserAttributes=[{"Name": "email_verified", "Value": "true"}],
)
for _ in range(3):
try:
client.admin_reset_user_password(UserPoolId=user_pool_id, Username=username)
user = client.admin_get_user(UserPoolId=user_pool_id, Username=username)
user["UserStatus"].should.equal("RESET_REQUIRED")
except ClientError:
pytest.fail("Shouldn't throw error on consecutive invocations")
# Test will retrieve public key from cognito.amazonaws.com/.well-known/jwks.json,
# which isnt mocked in ServerMode
if not settings.TEST_SERVER_MODE: