CognitoIDP - Validate password using policy (#5768)
This commit is contained in:
parent
860d8bf4b7
commit
c498c14ba3
@ -43,3 +43,11 @@ class InvalidParameterException(JsonRESTError):
|
||||
super().__init__(
|
||||
"InvalidParameterException", msg or "A parameter is specified incorrectly."
|
||||
)
|
||||
|
||||
|
||||
class InvalidPasswordException(JsonRESTError):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
error_type="InvalidPasswordException",
|
||||
message="The provided password does not confirm to the configured password policy",
|
||||
)
|
||||
|
@ -5,8 +5,6 @@ import time
|
||||
import typing
|
||||
import enum
|
||||
import re
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
from jose import jws
|
||||
from collections import OrderedDict
|
||||
from typing import Any, Dict, List, Tuple, Optional, Set
|
||||
@ -43,10 +41,6 @@ class UserStatus(str, enum.Enum):
|
||||
UNCONFIRMED = "UNCONFIRMED"
|
||||
RESET_REQUIRED = "RESET_REQUIRED"
|
||||
|
||||
class InvalidPasswordException(Exception):
|
||||
"Raised when the input value is less than 18"
|
||||
pass
|
||||
|
||||
|
||||
class AuthFlow(str, enum.Enum):
|
||||
# Order follows AWS' order
|
||||
@ -1612,6 +1606,10 @@ class CognitoIdpBackend(BaseBackend):
|
||||
) -> None:
|
||||
for user_pool in self.user_pools.values():
|
||||
if access_token in user_pool.access_tokens:
|
||||
self._validate_password(
|
||||
user_pool_id=user_pool.id, password=proposed_password
|
||||
)
|
||||
|
||||
_, username = user_pool.access_tokens[access_token]
|
||||
user = self.admin_get_user(user_pool.id, username)
|
||||
|
||||
@ -1730,6 +1728,8 @@ class CognitoIdpBackend(BaseBackend):
|
||||
"Username should be either an email or a phone number."
|
||||
)
|
||||
|
||||
self._validate_password(user_pool.id, password)
|
||||
|
||||
user = CognitoIdpUser(
|
||||
user_pool_id=user_pool.id,
|
||||
# set username to None so that it will be default to the internal GUID
|
||||
@ -1970,43 +1970,41 @@ class CognitoIdpBackend(BaseBackend):
|
||||
if sms_mfa_settings.get("PreferredMfa"):
|
||||
user.preferred_mfa_setting = "SMS_MFA"
|
||||
return None
|
||||
|
||||
def validate_password(password):
|
||||
tmp = password
|
||||
lgt = len(tmp)
|
||||
try:
|
||||
if(lgt > 5 and lgt <99):
|
||||
flagl = True
|
||||
else:
|
||||
flagl = False
|
||||
flagn = bool(re.match("\d", tmp))
|
||||
sc = "^ $ * . [ ] { } ( ) ? ! @ # % & / \ , > < ' : ; | _ ~ ` = + -"
|
||||
for i in tmp:
|
||||
if i in sc:
|
||||
flagsc = True
|
||||
break
|
||||
else:
|
||||
flagsc = False
|
||||
|
||||
flagu = bool(re.match('[A-Z]+', tmp))
|
||||
flaglo = bool(re.match('[a-z]+', tmp))
|
||||
if(flagl and flagn and flagsc and flagu and flaglo):
|
||||
return True
|
||||
else:
|
||||
raise InvalidPasswordException("The Password is invalid")
|
||||
except ClientError as e:
|
||||
print(e)
|
||||
|
||||
|
||||
def _validate_password(self, user_pool_id: str, password: str) -> None:
|
||||
user_pool = self.describe_user_pool(user_pool_id)
|
||||
password_policy = user_pool.extended_config.get("Policies", {}).get(
|
||||
"PasswordPolicy", {}
|
||||
)
|
||||
minimum = password_policy.get("MinimumLength", 5)
|
||||
maximum = password_policy.get("MaximumLength", 99)
|
||||
require_uppercase = password_policy.get("RequireUppercase", True)
|
||||
require_lowercase = password_policy.get("RequireLowercase", True)
|
||||
require_numbers = password_policy.get("RequireNumbers", True)
|
||||
require_symbols = password_policy.get("RequireSymbols", True)
|
||||
|
||||
flagl = minimum <= len(password) < maximum
|
||||
flagn = not require_numbers or bool(re.search(r"\d", password))
|
||||
# If we require symbols, we assume False - and check a symbol is present
|
||||
# If we don't require symbols, we assume True - and we could technically skip the for-loop
|
||||
flag_sc = not require_symbols
|
||||
sc = "^ $ * . [ ] { } ( ) ? ! @ # % & / \\ , > < ' : ; | _ ~ ` = + -"
|
||||
for i in password:
|
||||
if i in sc:
|
||||
flag_sc = True
|
||||
|
||||
flag_u = not require_uppercase or bool(re.search(r"[A-Z]+", password))
|
||||
flag_lo = not require_lowercase or bool(re.search(r"[a-z]+", password))
|
||||
if not (flagl and flagn and flag_sc and flag_u and flag_lo):
|
||||
raise InvalidPasswordException()
|
||||
|
||||
def admin_set_user_password(
|
||||
self, user_pool_id: str, username: str, password: str, permanent: bool
|
||||
) -> None:
|
||||
user = self.admin_get_user(user_pool_id, username)
|
||||
#user.password = password
|
||||
flag = False
|
||||
flag = validate_password(password)
|
||||
if(flag == True):
|
||||
user.password = password
|
||||
# user.password = password
|
||||
self._validate_password(user_pool_id, password)
|
||||
user.password = password
|
||||
if permanent:
|
||||
user.status = UserStatus.CONFIRMED
|
||||
else:
|
||||
|
@ -1424,7 +1424,7 @@ def test_group_in_access_token():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
temporary_password = str(uuid.uuid4())
|
||||
temporary_password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
user_attribute_name = str(uuid.uuid4())
|
||||
user_attribute_value = str(uuid.uuid4())
|
||||
@ -1460,7 +1460,7 @@ def test_group_in_access_token():
|
||||
result["Session"].should_not.equal(None)
|
||||
|
||||
# This sets a new password and logs the user in (creates tokens)
|
||||
new_password = str(uuid.uuid4())
|
||||
new_password = "P2$Sword"
|
||||
result = conn.respond_to_auth_challenge(
|
||||
Session=result["Session"],
|
||||
ClientId=client_id,
|
||||
@ -2600,7 +2600,7 @@ def test_admin_delete_user_with_username_attributes():
|
||||
|
||||
def authentication_flow(conn, auth_flow):
|
||||
username = str(uuid.uuid4())
|
||||
temporary_password = str(uuid.uuid4())
|
||||
temporary_password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
user_attribute_name = str(uuid.uuid4())
|
||||
user_attribute_value = str(uuid.uuid4())
|
||||
@ -2629,7 +2629,7 @@ def authentication_flow(conn, auth_flow):
|
||||
result["Session"].should_not.equal(None)
|
||||
|
||||
# This sets a new password and logs the user in (creates tokens)
|
||||
new_password = str(uuid.uuid4())
|
||||
new_password = "P2$Sword"
|
||||
result = conn.respond_to_auth_challenge(
|
||||
Session=result["Session"],
|
||||
ClientId=client_id,
|
||||
@ -2691,7 +2691,7 @@ def test_authentication_flow_invalid_user_flow():
|
||||
|
||||
def user_authentication_flow(conn):
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
user_attribute_name = str(uuid.uuid4())
|
||||
user_attribute_value = str(uuid.uuid4())
|
||||
@ -2857,7 +2857,7 @@ def test_change_password():
|
||||
outputs = authentication_flow(conn, auth_flow)
|
||||
|
||||
# Take this opportunity to test change_password, which requires an access token.
|
||||
newer_password = str(uuid.uuid4())
|
||||
newer_password = "P2$Sword2"
|
||||
conn.change_password(
|
||||
AccessToken=outputs["access_token"],
|
||||
PreviousPassword=outputs["password"],
|
||||
@ -2893,7 +2893,7 @@ def test_change_password__using_custom_user_agent_header():
|
||||
outputs = authentication_flow(conn, auth_flow)
|
||||
|
||||
# Take this opportunity to test change_password, which requires an access token.
|
||||
newer_password = str(uuid.uuid4())
|
||||
newer_password = "P2$Sword2"
|
||||
conn.change_password(
|
||||
AccessToken=outputs["access_token"],
|
||||
PreviousPassword=outputs["password"],
|
||||
@ -3458,12 +3458,28 @@ def test_sign_up():
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
result = conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
result["UserConfirmed"].should.equal(False)
|
||||
result["UserSub"].should_not.equal(None)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
@pytest.mark.parametrize("password", ["p2$$word", "P2$s"])
|
||||
def test_sign_up_with_invalid_password(password):
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
username = str(uuid.uuid4())
|
||||
|
||||
with pytest.raises(ClientError) as exc:
|
||||
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("InvalidPasswordException")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_sign_up_with_username_attributes():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
@ -3474,7 +3490,7 @@ def test_sign_up_with_username_attributes():
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
with pytest.raises(ClientError) as err:
|
||||
# Attempt to add user again
|
||||
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
@ -3500,7 +3516,7 @@ def test_sign_up_existing_user():
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4())
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
|
||||
# Add initial user
|
||||
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
@ -3516,7 +3532,7 @@ def test_sign_up_existing_user():
|
||||
def test_confirm_sign_up():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True
|
||||
@ -3535,7 +3551,7 @@ def test_confirm_sign_up():
|
||||
def test_confirm_sign_up_with_username_attributes():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = "test@example.com"
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(
|
||||
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
|
||||
)["UserPool"]["Id"]
|
||||
@ -3556,7 +3572,7 @@ def test_confirm_sign_up_with_username_attributes():
|
||||
def test_initiate_auth_USER_SRP_AUTH():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True
|
||||
@ -3592,7 +3608,7 @@ def test_initiate_auth_USER_SRP_AUTH():
|
||||
def test_initiate_auth_USER_SRP_AUTH_with_username_attributes():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = "test@example.com"
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(
|
||||
PoolName=str(uuid.uuid4()), UsernameAttributes=["email"]
|
||||
)["UserPool"]["Id"]
|
||||
@ -3726,7 +3742,7 @@ def test_initiate_auth_USER_PASSWORD_AUTH_with_FORCE_CHANGE_PASSWORD_status():
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
|
||||
# Create user in status FORCE_CHANGE_PASSWORD
|
||||
temporary_password = str(uuid.uuid4())
|
||||
temporary_password = "P2$Sword"
|
||||
client.admin_create_user(
|
||||
UserPoolId=user_pool_id, Username=username, TemporaryPassword=temporary_password
|
||||
)
|
||||
@ -3742,7 +3758,7 @@ def test_initiate_auth_USER_PASSWORD_AUTH_with_FORCE_CHANGE_PASSWORD_status():
|
||||
result["Session"].should_not.equal("")
|
||||
assert result.get("AuthenticationResult") is None
|
||||
|
||||
new_password = str(uuid.uuid4())
|
||||
new_password = "P2$Sword2"
|
||||
result = client.respond_to_auth_challenge(
|
||||
ClientId=client_id,
|
||||
ChallengeName="NEW_PASSWORD_REQUIRED",
|
||||
@ -3792,7 +3808,7 @@ def test_initiate_auth_USER_PASSWORD_AUTH_user_incorrect_password():
|
||||
def test_initiate_auth_USER_PASSWORD_AUTH_unconfirmed_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True
|
||||
@ -3813,7 +3829,7 @@ def test_initiate_auth_USER_PASSWORD_AUTH_unconfirmed_user():
|
||||
def test_initiate_auth_for_unconfirmed_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True
|
||||
@ -3846,7 +3862,7 @@ def test_initiate_auth_for_unconfirmed_user():
|
||||
def test_initiate_auth_with_invalid_secret_hash():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$Sword"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True
|
||||
@ -4000,7 +4016,7 @@ def test_admin_set_user_password():
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
value = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
password = "P2$$word"
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
conn.admin_create_user(
|
||||
UserPoolId=user_pool_id,
|
||||
@ -4022,6 +4038,73 @@ def test_admin_set_user_password():
|
||||
_verify_attribute("thing", value)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
@pytest.mark.parametrize("password", ["pa$$word", "Password", "p2ssword", "P2$S"])
|
||||
def test_admin_set_invalid_user_password(password):
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
value = str(uuid.uuid4())
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
conn.admin_create_user(
|
||||
UserPoolId=user_pool_id,
|
||||
Username=username,
|
||||
UserAttributes=[{"Name": "thing", "Value": value}],
|
||||
)
|
||||
with pytest.raises(ClientError) as exc:
|
||||
conn.admin_set_user_password(
|
||||
UserPoolId=user_pool_id,
|
||||
Username=username,
|
||||
Password=password,
|
||||
Permanent=True,
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("InvalidPasswordException")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
@pytest.mark.parametrize("password", ["password", "P2$$word"])
|
||||
def test_admin_set_invalid_user_password__custom_policy_provided(password):
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
username = str(uuid.uuid4())
|
||||
value = str(uuid.uuid4())
|
||||
user_pool_id = conn.create_user_pool(
|
||||
PoolName=str(uuid.uuid4()),
|
||||
Policies={
|
||||
"PasswordPolicy": {
|
||||
"MinimumLength": 12,
|
||||
"RequireUppercase": False,
|
||||
"RequireLowercase": False,
|
||||
"RequireNumbers": False,
|
||||
"RequireSymbols": False,
|
||||
}
|
||||
},
|
||||
)["UserPool"]["Id"]
|
||||
conn.admin_create_user(
|
||||
UserPoolId=user_pool_id,
|
||||
Username=username,
|
||||
UserAttributes=[{"Name": "thing", "Value": value}],
|
||||
)
|
||||
with pytest.raises(ClientError) as exc:
|
||||
conn.admin_set_user_password(
|
||||
UserPoolId=user_pool_id,
|
||||
Username=username,
|
||||
Password=password,
|
||||
Permanent=True,
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("InvalidPasswordException")
|
||||
|
||||
# We can set a plain password, as long as it's 12 characters long
|
||||
conn.admin_set_user_password(
|
||||
UserPoolId=user_pool_id,
|
||||
Username=username,
|
||||
Password="longpassword",
|
||||
Permanent=True,
|
||||
)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_change_password_with_invalid_token_raises_error():
|
||||
client = boto3.client("cognito-idp", "us-west-2")
|
||||
|
@ -18,18 +18,21 @@ class TestCognitoUserDeleter(TestCase):
|
||||
|
||||
def test_authenticate_with_signed_out_user(self):
|
||||
self.client.admin_create_user(
|
||||
UserPoolId=self.pool_id, Username="foo", TemporaryPassword="bar"
|
||||
UserPoolId=self.pool_id, Username="foo", TemporaryPassword="P2$Sword"
|
||||
)
|
||||
|
||||
self.client.admin_set_user_password(
|
||||
UserPoolId=self.pool_id, Username="foo", Password="bar", Permanent=True
|
||||
UserPoolId=self.pool_id,
|
||||
Username="foo",
|
||||
Password="P2$Sword2",
|
||||
Permanent=True,
|
||||
)
|
||||
|
||||
response = self.client.admin_initiate_auth(
|
||||
UserPoolId=self.pool_id,
|
||||
ClientId=self.client_id,
|
||||
AuthFlow="ADMIN_USER_PASSWORD_AUTH",
|
||||
AuthParameters={"USERNAME": "foo", "PASSWORD": "bar"},
|
||||
AuthParameters={"USERNAME": "foo", "PASSWORD": "P2$Sword2"},
|
||||
)
|
||||
|
||||
refresh_token = response["AuthenticationResult"]["RefreshToken"]
|
||||
|
@ -47,7 +47,7 @@ def test_sign_up_user_without_authentication():
|
||||
json.loads(res.data)["UserPoolClients"].should.have.length_of(1)
|
||||
|
||||
# Sign Up User
|
||||
data = {"ClientId": client_id, "Username": "test@gmail.com", "Password": "12345678"}
|
||||
data = {"ClientId": client_id, "Username": "test@gmail.com", "Password": "P2$Sword"}
|
||||
res = test_client.post(
|
||||
"/",
|
||||
data=json.dumps(data),
|
||||
@ -72,7 +72,7 @@ def test_sign_up_user_without_authentication():
|
||||
data = {
|
||||
"ClientId": client_id,
|
||||
"AuthFlow": "USER_PASSWORD_AUTH",
|
||||
"AuthParameters": {"USERNAME": "test@gmail.com", "PASSWORD": "12345678"},
|
||||
"AuthParameters": {"USERNAME": "test@gmail.com", "PASSWORD": "P2$Sword"},
|
||||
}
|
||||
res = test_client.post(
|
||||
"/",
|
||||
|
Loading…
Reference in New Issue
Block a user