diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index 13da5d41d..b51888cb3 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -719,6 +719,27 @@ class CognitoIdpBackend(BaseBackend): "Session": session, } + return self._log_user_in(user_pool, client, username) + elif auth_flow == "ADMIN_USER_PASSWORD_AUTH": + username = auth_parameters.get("USERNAME") + password = auth_parameters.get("PASSWORD") + user = user_pool.users.get(username) + if not user: + raise UserNotFoundError(username) + + if user.password != password: + raise NotAuthorizedError(username) + + if user.status == UserStatus["FORCE_CHANGE_PASSWORD"]: + session = str(uuid.uuid4()) + self.sessions[session] = user_pool + + return { + "ChallengeName": "NEW_PASSWORD_REQUIRED", + "ChallengeParameters": {}, + "Session": session, + } + return self._log_user_in(user_pool, client, username) elif auth_flow == "REFRESH_TOKEN": refresh_token = auth_parameters.get("REFRESH_TOKEN") diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index f45f7c079..8ce4ff88d 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -1198,7 +1198,7 @@ def test_admin_delete_user(): caught.should.be.true -def authentication_flow(conn): +def authentication_flow(conn, auth_flow): username = str(uuid.uuid4()) temporary_password = str(uuid.uuid4()) user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] @@ -1220,7 +1220,7 @@ def authentication_flow(conn): result = conn.admin_initiate_auth( UserPoolId=user_pool_id, ClientId=client_id, - AuthFlow="ADMIN_NO_SRP_AUTH", + AuthFlow=auth_flow, AuthParameters={"USERNAME": username, "PASSWORD": temporary_password}, ) @@ -1255,7 +1255,8 @@ def authentication_flow(conn): def test_authentication_flow(): conn = boto3.client("cognito-idp", "us-west-2") - authentication_flow(conn) + for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: + authentication_flow(conn, auth_flow) def user_authentication_flow(conn): @@ -1397,49 +1398,54 @@ def test_token_legitimacy(): with open(os.path.join(os.path.dirname(__file__), path)) as f: json_web_key = json.loads(f.read())["keys"][0] - outputs = authentication_flow(conn) - id_token = outputs["id_token"] - access_token = outputs["access_token"] - client_id = outputs["client_id"] - issuer = "https://cognito-idp.us-west-2.amazonaws.com/{}".format( - outputs["user_pool_id"] - ) - id_claims = json.loads(jws.verify(id_token, json_web_key, "RS256")) - id_claims["iss"].should.equal(issuer) - id_claims["aud"].should.equal(client_id) - id_claims["token_use"].should.equal("id") - for k, v in outputs["additional_fields"].items(): - id_claims[k].should.equal(v) - access_claims = json.loads(jws.verify(access_token, json_web_key, "RS256")) - access_claims["iss"].should.equal(issuer) - access_claims["aud"].should.equal(client_id) - access_claims["token_use"].should.equal("access") + for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: + outputs = authentication_flow(conn, auth_flow) + id_token = outputs["id_token"] + access_token = outputs["access_token"] + client_id = outputs["client_id"] + issuer = "https://cognito-idp.us-west-2.amazonaws.com/{}".format( + outputs["user_pool_id"] + ) + id_claims = json.loads(jws.verify(id_token, json_web_key, "RS256")) + id_claims["iss"].should.equal(issuer) + id_claims["aud"].should.equal(client_id) + id_claims["token_use"].should.equal("id") + for k, v in outputs["additional_fields"].items(): + id_claims[k].should.equal(v) + access_claims = json.loads(jws.verify(access_token, json_web_key, "RS256")) + access_claims["iss"].should.equal(issuer) + access_claims["aud"].should.equal(client_id) + access_claims["token_use"].should.equal("access") @mock_cognitoidp def test_change_password(): conn = boto3.client("cognito-idp", "us-west-2") - outputs = authentication_flow(conn) + for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: + outputs = authentication_flow(conn, auth_flow) - # Take this opportunity to test change_password, which requires an access token. - newer_password = str(uuid.uuid4()) - conn.change_password( - AccessToken=outputs["access_token"], - PreviousPassword=outputs["password"], - ProposedPassword=newer_password, - ) + # Take this opportunity to test change_password, which requires an access token. + newer_password = str(uuid.uuid4()) + conn.change_password( + AccessToken=outputs["access_token"], + PreviousPassword=outputs["password"], + ProposedPassword=newer_password, + ) - # Log in again, which should succeed without a challenge because the user is no - # longer in the force-new-password state. - result = conn.admin_initiate_auth( - UserPoolId=outputs["user_pool_id"], - ClientId=outputs["client_id"], - AuthFlow="ADMIN_NO_SRP_AUTH", - AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password}, - ) + # Log in again, which should succeed without a challenge because the user is no + # longer in the force-new-password state. + result = conn.admin_initiate_auth( + UserPoolId=outputs["user_pool_id"], + ClientId=outputs["client_id"], + AuthFlow="ADMIN_NO_SRP_AUTH", + AuthParameters={ + "USERNAME": outputs["username"], + "PASSWORD": newer_password, + }, + ) - result["AuthenticationResult"].should_not.be.none + result["AuthenticationResult"].should_not.be.none @mock_cognitoidp @@ -1452,26 +1458,30 @@ def test_change_password__using_custom_user_agent_header(): my_config = Config(user_agent_extra="more/info", signature_version="v4") conn = boto3.client("cognito-idp", "us-west-2", config=my_config) - outputs = authentication_flow(conn) + for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: + outputs = authentication_flow(conn, auth_flow) - # Take this opportunity to test change_password, which requires an access token. - newer_password = str(uuid.uuid4()) - conn.change_password( - AccessToken=outputs["access_token"], - PreviousPassword=outputs["password"], - ProposedPassword=newer_password, - ) + # Take this opportunity to test change_password, which requires an access token. + newer_password = str(uuid.uuid4()) + conn.change_password( + AccessToken=outputs["access_token"], + PreviousPassword=outputs["password"], + ProposedPassword=newer_password, + ) - # Log in again, which should succeed without a challenge because the user is no - # longer in the force-new-password state. - result = conn.admin_initiate_auth( - UserPoolId=outputs["user_pool_id"], - ClientId=outputs["client_id"], - AuthFlow="ADMIN_NO_SRP_AUTH", - AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password}, - ) + # Log in again, which should succeed without a challenge because the user is no + # longer in the force-new-password state. + result = conn.admin_initiate_auth( + UserPoolId=outputs["user_pool_id"], + ClientId=outputs["client_id"], + AuthFlow="ADMIN_NO_SRP_AUTH", + AuthParameters={ + "USERNAME": outputs["username"], + "PASSWORD": newer_password, + }, + ) - result["AuthenticationResult"].should_not.be.none + result["AuthenticationResult"].should_not.be.none @mock_cognitoidp @@ -1737,36 +1747,42 @@ def test_initiate_auth_with_invalid_secret_hash(): @mock_cognitoidp def test_setting_mfa(): conn = boto3.client("cognito-idp", "us-west-2") - result = authentication_flow(conn) - conn.associate_software_token(AccessToken=result["access_token"]) - conn.verify_software_token(AccessToken=result["access_token"], UserCode="123456") - conn.set_user_mfa_preference( - AccessToken=result["access_token"], - SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, - ) - result = conn.admin_get_user( - UserPoolId=result["user_pool_id"], Username=result["username"] - ) - result["UserMFASettingList"].should.have.length_of(1) + for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: + result = authentication_flow(conn, auth_flow) + conn.associate_software_token(AccessToken=result["access_token"]) + conn.verify_software_token( + AccessToken=result["access_token"], UserCode="123456" + ) + conn.set_user_mfa_preference( + AccessToken=result["access_token"], + SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, + ) + result = conn.admin_get_user( + UserPoolId=result["user_pool_id"], Username=result["username"] + ) + + result["UserMFASettingList"].should.have.length_of(1) @mock_cognitoidp def test_setting_mfa_when_token_not_verified(): conn = boto3.client("cognito-idp", "us-west-2") - result = authentication_flow(conn) - conn.associate_software_token(AccessToken=result["access_token"]) - caught = False - try: - conn.set_user_mfa_preference( - AccessToken=result["access_token"], - SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, - ) - except conn.exceptions.InvalidParameterException: - caught = True + for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: + result = authentication_flow(conn, auth_flow) + conn.associate_software_token(AccessToken=result["access_token"]) - caught.should.be.true + caught = False + try: + conn.set_user_mfa_preference( + AccessToken=result["access_token"], + SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, + ) + except conn.exceptions.InvalidParameterException: + caught = True + + caught.should.be.true @mock_cognitoidp