Support cognito admin user password auth flow (#3547)
Applies the user credentials pattern from the ADMIN_NO_SRP_AUTH flow to the ADMIN_USER_PASSWORD_AUTH auth flow for Cognito admin_initiate_auth requests. Co-authored-by: Robin Wilkins <r.wilkins@waracle.com>
This commit is contained in:
parent
c9c30b8286
commit
a31599d000
@ -719,6 +719,27 @@ class CognitoIdpBackend(BaseBackend):
|
|||||||
"Session": session,
|
"Session": session,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return self._log_user_in(user_pool, client, username)
|
||||||
|
elif auth_flow == "ADMIN_USER_PASSWORD_AUTH":
|
||||||
|
username = auth_parameters.get("USERNAME")
|
||||||
|
password = auth_parameters.get("PASSWORD")
|
||||||
|
user = user_pool.users.get(username)
|
||||||
|
if not user:
|
||||||
|
raise UserNotFoundError(username)
|
||||||
|
|
||||||
|
if user.password != password:
|
||||||
|
raise NotAuthorizedError(username)
|
||||||
|
|
||||||
|
if user.status == UserStatus["FORCE_CHANGE_PASSWORD"]:
|
||||||
|
session = str(uuid.uuid4())
|
||||||
|
self.sessions[session] = user_pool
|
||||||
|
|
||||||
|
return {
|
||||||
|
"ChallengeName": "NEW_PASSWORD_REQUIRED",
|
||||||
|
"ChallengeParameters": {},
|
||||||
|
"Session": session,
|
||||||
|
}
|
||||||
|
|
||||||
return self._log_user_in(user_pool, client, username)
|
return self._log_user_in(user_pool, client, username)
|
||||||
elif auth_flow == "REFRESH_TOKEN":
|
elif auth_flow == "REFRESH_TOKEN":
|
||||||
refresh_token = auth_parameters.get("REFRESH_TOKEN")
|
refresh_token = auth_parameters.get("REFRESH_TOKEN")
|
||||||
|
@ -1198,7 +1198,7 @@ def test_admin_delete_user():
|
|||||||
caught.should.be.true
|
caught.should.be.true
|
||||||
|
|
||||||
|
|
||||||
def authentication_flow(conn):
|
def authentication_flow(conn, auth_flow):
|
||||||
username = str(uuid.uuid4())
|
username = str(uuid.uuid4())
|
||||||
temporary_password = str(uuid.uuid4())
|
temporary_password = str(uuid.uuid4())
|
||||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||||
@ -1220,7 +1220,7 @@ def authentication_flow(conn):
|
|||||||
result = conn.admin_initiate_auth(
|
result = conn.admin_initiate_auth(
|
||||||
UserPoolId=user_pool_id,
|
UserPoolId=user_pool_id,
|
||||||
ClientId=client_id,
|
ClientId=client_id,
|
||||||
AuthFlow="ADMIN_NO_SRP_AUTH",
|
AuthFlow=auth_flow,
|
||||||
AuthParameters={"USERNAME": username, "PASSWORD": temporary_password},
|
AuthParameters={"USERNAME": username, "PASSWORD": temporary_password},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1255,7 +1255,8 @@ def authentication_flow(conn):
|
|||||||
def test_authentication_flow():
|
def test_authentication_flow():
|
||||||
conn = boto3.client("cognito-idp", "us-west-2")
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
|
||||||
authentication_flow(conn)
|
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
|
||||||
|
authentication_flow(conn, auth_flow)
|
||||||
|
|
||||||
|
|
||||||
def user_authentication_flow(conn):
|
def user_authentication_flow(conn):
|
||||||
@ -1397,49 +1398,54 @@ def test_token_legitimacy():
|
|||||||
with open(os.path.join(os.path.dirname(__file__), path)) as f:
|
with open(os.path.join(os.path.dirname(__file__), path)) as f:
|
||||||
json_web_key = json.loads(f.read())["keys"][0]
|
json_web_key = json.loads(f.read())["keys"][0]
|
||||||
|
|
||||||
outputs = authentication_flow(conn)
|
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
|
||||||
id_token = outputs["id_token"]
|
outputs = authentication_flow(conn, auth_flow)
|
||||||
access_token = outputs["access_token"]
|
id_token = outputs["id_token"]
|
||||||
client_id = outputs["client_id"]
|
access_token = outputs["access_token"]
|
||||||
issuer = "https://cognito-idp.us-west-2.amazonaws.com/{}".format(
|
client_id = outputs["client_id"]
|
||||||
outputs["user_pool_id"]
|
issuer = "https://cognito-idp.us-west-2.amazonaws.com/{}".format(
|
||||||
)
|
outputs["user_pool_id"]
|
||||||
id_claims = json.loads(jws.verify(id_token, json_web_key, "RS256"))
|
)
|
||||||
id_claims["iss"].should.equal(issuer)
|
id_claims = json.loads(jws.verify(id_token, json_web_key, "RS256"))
|
||||||
id_claims["aud"].should.equal(client_id)
|
id_claims["iss"].should.equal(issuer)
|
||||||
id_claims["token_use"].should.equal("id")
|
id_claims["aud"].should.equal(client_id)
|
||||||
for k, v in outputs["additional_fields"].items():
|
id_claims["token_use"].should.equal("id")
|
||||||
id_claims[k].should.equal(v)
|
for k, v in outputs["additional_fields"].items():
|
||||||
access_claims = json.loads(jws.verify(access_token, json_web_key, "RS256"))
|
id_claims[k].should.equal(v)
|
||||||
access_claims["iss"].should.equal(issuer)
|
access_claims = json.loads(jws.verify(access_token, json_web_key, "RS256"))
|
||||||
access_claims["aud"].should.equal(client_id)
|
access_claims["iss"].should.equal(issuer)
|
||||||
access_claims["token_use"].should.equal("access")
|
access_claims["aud"].should.equal(client_id)
|
||||||
|
access_claims["token_use"].should.equal("access")
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
def test_change_password():
|
def test_change_password():
|
||||||
conn = boto3.client("cognito-idp", "us-west-2")
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
|
||||||
outputs = authentication_flow(conn)
|
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
|
||||||
|
outputs = authentication_flow(conn, auth_flow)
|
||||||
|
|
||||||
# Take this opportunity to test change_password, which requires an access token.
|
# Take this opportunity to test change_password, which requires an access token.
|
||||||
newer_password = str(uuid.uuid4())
|
newer_password = str(uuid.uuid4())
|
||||||
conn.change_password(
|
conn.change_password(
|
||||||
AccessToken=outputs["access_token"],
|
AccessToken=outputs["access_token"],
|
||||||
PreviousPassword=outputs["password"],
|
PreviousPassword=outputs["password"],
|
||||||
ProposedPassword=newer_password,
|
ProposedPassword=newer_password,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Log in again, which should succeed without a challenge because the user is no
|
# Log in again, which should succeed without a challenge because the user is no
|
||||||
# longer in the force-new-password state.
|
# longer in the force-new-password state.
|
||||||
result = conn.admin_initiate_auth(
|
result = conn.admin_initiate_auth(
|
||||||
UserPoolId=outputs["user_pool_id"],
|
UserPoolId=outputs["user_pool_id"],
|
||||||
ClientId=outputs["client_id"],
|
ClientId=outputs["client_id"],
|
||||||
AuthFlow="ADMIN_NO_SRP_AUTH",
|
AuthFlow="ADMIN_NO_SRP_AUTH",
|
||||||
AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password},
|
AuthParameters={
|
||||||
)
|
"USERNAME": outputs["username"],
|
||||||
|
"PASSWORD": newer_password,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
result["AuthenticationResult"].should_not.be.none
|
result["AuthenticationResult"].should_not.be.none
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
@ -1452,26 +1458,30 @@ def test_change_password__using_custom_user_agent_header():
|
|||||||
my_config = Config(user_agent_extra="more/info", signature_version="v4")
|
my_config = Config(user_agent_extra="more/info", signature_version="v4")
|
||||||
conn = boto3.client("cognito-idp", "us-west-2", config=my_config)
|
conn = boto3.client("cognito-idp", "us-west-2", config=my_config)
|
||||||
|
|
||||||
outputs = authentication_flow(conn)
|
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
|
||||||
|
outputs = authentication_flow(conn, auth_flow)
|
||||||
|
|
||||||
# Take this opportunity to test change_password, which requires an access token.
|
# Take this opportunity to test change_password, which requires an access token.
|
||||||
newer_password = str(uuid.uuid4())
|
newer_password = str(uuid.uuid4())
|
||||||
conn.change_password(
|
conn.change_password(
|
||||||
AccessToken=outputs["access_token"],
|
AccessToken=outputs["access_token"],
|
||||||
PreviousPassword=outputs["password"],
|
PreviousPassword=outputs["password"],
|
||||||
ProposedPassword=newer_password,
|
ProposedPassword=newer_password,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Log in again, which should succeed without a challenge because the user is no
|
# Log in again, which should succeed without a challenge because the user is no
|
||||||
# longer in the force-new-password state.
|
# longer in the force-new-password state.
|
||||||
result = conn.admin_initiate_auth(
|
result = conn.admin_initiate_auth(
|
||||||
UserPoolId=outputs["user_pool_id"],
|
UserPoolId=outputs["user_pool_id"],
|
||||||
ClientId=outputs["client_id"],
|
ClientId=outputs["client_id"],
|
||||||
AuthFlow="ADMIN_NO_SRP_AUTH",
|
AuthFlow="ADMIN_NO_SRP_AUTH",
|
||||||
AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password},
|
AuthParameters={
|
||||||
)
|
"USERNAME": outputs["username"],
|
||||||
|
"PASSWORD": newer_password,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
result["AuthenticationResult"].should_not.be.none
|
result["AuthenticationResult"].should_not.be.none
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
@ -1737,36 +1747,42 @@ def test_initiate_auth_with_invalid_secret_hash():
|
|||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
def test_setting_mfa():
|
def test_setting_mfa():
|
||||||
conn = boto3.client("cognito-idp", "us-west-2")
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
result = authentication_flow(conn)
|
|
||||||
conn.associate_software_token(AccessToken=result["access_token"])
|
|
||||||
conn.verify_software_token(AccessToken=result["access_token"], UserCode="123456")
|
|
||||||
conn.set_user_mfa_preference(
|
|
||||||
AccessToken=result["access_token"],
|
|
||||||
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
|
|
||||||
)
|
|
||||||
result = conn.admin_get_user(
|
|
||||||
UserPoolId=result["user_pool_id"], Username=result["username"]
|
|
||||||
)
|
|
||||||
|
|
||||||
result["UserMFASettingList"].should.have.length_of(1)
|
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
|
||||||
|
result = authentication_flow(conn, auth_flow)
|
||||||
|
conn.associate_software_token(AccessToken=result["access_token"])
|
||||||
|
conn.verify_software_token(
|
||||||
|
AccessToken=result["access_token"], UserCode="123456"
|
||||||
|
)
|
||||||
|
conn.set_user_mfa_preference(
|
||||||
|
AccessToken=result["access_token"],
|
||||||
|
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
|
||||||
|
)
|
||||||
|
result = conn.admin_get_user(
|
||||||
|
UserPoolId=result["user_pool_id"], Username=result["username"]
|
||||||
|
)
|
||||||
|
|
||||||
|
result["UserMFASettingList"].should.have.length_of(1)
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
def test_setting_mfa_when_token_not_verified():
|
def test_setting_mfa_when_token_not_verified():
|
||||||
conn = boto3.client("cognito-idp", "us-west-2")
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
result = authentication_flow(conn)
|
|
||||||
conn.associate_software_token(AccessToken=result["access_token"])
|
|
||||||
|
|
||||||
caught = False
|
for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]:
|
||||||
try:
|
result = authentication_flow(conn, auth_flow)
|
||||||
conn.set_user_mfa_preference(
|
conn.associate_software_token(AccessToken=result["access_token"])
|
||||||
AccessToken=result["access_token"],
|
|
||||||
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
|
|
||||||
)
|
|
||||||
except conn.exceptions.InvalidParameterException:
|
|
||||||
caught = True
|
|
||||||
|
|
||||||
caught.should.be.true
|
caught = False
|
||||||
|
try:
|
||||||
|
conn.set_user_mfa_preference(
|
||||||
|
AccessToken=result["access_token"],
|
||||||
|
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
|
||||||
|
)
|
||||||
|
except conn.exceptions.InvalidParameterException:
|
||||||
|
caught = True
|
||||||
|
|
||||||
|
caught.should.be.true
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
|
Loading…
Reference in New Issue
Block a user