diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index 54411487a..2167e4aed 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -3,6 +3,7 @@ import hashlib import json import os import time +import typing import uuid import enum import random @@ -39,6 +40,21 @@ class UserStatus(str, enum.Enum): RESET_REQUIRED = "RESET_REQUIRED" +class AuthFlow(str, enum.Enum): + # Order follows AWS' order + ADMIN_NO_SRP_AUTH = "ADMIN_NO_SRP_AUTH" + ADMIN_USER_PASSWORD_AUTH = "ADMIN_USER_PASSWORD_AUTH" + USER_SRP_AUTH = "USER_SRP_AUTH" + REFRESH_TOKEN_AUTH = "REFRESH_TOKEN_AUTH" + REFRESH_TOKEN = "REFRESH_TOKEN" + CUSTOM_AUTH = "CUSTOM_AUTH" + USER_PASSWORD_AUTH = "USER_PASSWORD_AUTH" + + @classmethod + def list(cls): + return [e.value for e in cls] + + class CognitoIdpUserPoolAttribute(BaseModel): STANDARD_SCHEMA = { @@ -1110,14 +1126,43 @@ class CognitoIdpBackend(BaseBackend): } } + def _validate_auth_flow( + self, auth_flow: str, valid_flows: typing.List[AuthFlow] + ) -> AuthFlow: + """ validate auth_flow value and convert auth_flow to enum """ + + try: + auth_flow = AuthFlow[auth_flow] + except KeyError: + raise InvalidParameterException( + f"1 validation error detected: Value '{auth_flow}' at 'authFlow' failed to satisfy constraint: " + f"Member must satisfy enum value set: " + f"{AuthFlow.list()}" + ) + + if auth_flow not in valid_flows: + raise InvalidParameterException("Initiate Auth method not supported") + + return auth_flow + def admin_initiate_auth(self, user_pool_id, client_id, auth_flow, auth_parameters): + admin_auth_flows = [ + AuthFlow.ADMIN_NO_SRP_AUTH, + AuthFlow.ADMIN_USER_PASSWORD_AUTH, + AuthFlow.REFRESH_TOKEN_AUTH, + AuthFlow.REFRESH_TOKEN, + ] + auth_flow = self._validate_auth_flow( + auth_flow=auth_flow, valid_flows=admin_auth_flows + ) + user_pool = self.describe_user_pool(user_pool_id) client = user_pool.clients.get(client_id) if not client: raise ResourceNotFoundError(client_id) - if auth_flow in ("ADMIN_USER_PASSWORD_AUTH", "ADMIN_NO_SRP_AUTH"): + if auth_flow in (AuthFlow.ADMIN_USER_PASSWORD_AUTH, AuthFlow.ADMIN_NO_SRP_AUTH): username = auth_parameters.get("USERNAME") password = auth_parameters.get("PASSWORD") user = self.admin_get_user(user_pool_id, username) @@ -1139,7 +1184,7 @@ class CognitoIdpBackend(BaseBackend): } return self._log_user_in(user_pool, client, username) - elif auth_flow == "REFRESH_TOKEN": + elif auth_flow is AuthFlow.REFRESH_TOKEN: refresh_token = auth_parameters.get("REFRESH_TOKEN") ( id_token, @@ -1152,10 +1197,12 @@ class CognitoIdpBackend(BaseBackend): "IdToken": id_token, "AccessToken": access_token, "ExpiresIn": expires_in, + "TokenType": "Bearer", } } else: - return {} + # We shouldn't get here due to enum validation of auth_flow + return None def respond_to_auth_challenge( self, session, client_id, challenge_name, challenge_responses @@ -1429,6 +1476,18 @@ class CognitoIdpBackend(BaseBackend): return "" def initiate_auth(self, client_id, auth_flow, auth_parameters): + user_auth_flows = [ + AuthFlow.USER_SRP_AUTH, + AuthFlow.REFRESH_TOKEN_AUTH, + AuthFlow.REFRESH_TOKEN, + AuthFlow.CUSTOM_AUTH, + AuthFlow.USER_PASSWORD_AUTH, + ] + + auth_flow = self._validate_auth_flow( + auth_flow=auth_flow, valid_flows=user_auth_flows + ) + user_pool = None for p in self.user_pools.values(): if client_id in p.clients: @@ -1438,7 +1497,7 @@ class CognitoIdpBackend(BaseBackend): client = p.clients.get(client_id) - if auth_flow == "USER_SRP_AUTH": + if auth_flow is AuthFlow.USER_SRP_AUTH: username = auth_parameters.get("USERNAME") srp_a = auth_parameters.get("SRP_A") if not srp_a: @@ -1469,7 +1528,7 @@ class CognitoIdpBackend(BaseBackend): "SECRET_BLOCK": session, }, } - elif auth_flow == "USER_PASSWORD_AUTH": + elif auth_flow is AuthFlow.USER_PASSWORD_AUTH: username = auth_parameters.get("USERNAME") password = auth_parameters.get("PASSWORD") @@ -1509,7 +1568,7 @@ class CognitoIdpBackend(BaseBackend): "TokenType": "Bearer", } } - elif auth_flow in ("REFRESH_TOKEN", "REFRESH_TOKEN_AUTH"): + elif auth_flow in (AuthFlow.REFRESH_TOKEN, AuthFlow.REFRESH_TOKEN_AUTH): refresh_token = auth_parameters.get("REFRESH_TOKEN") if not refresh_token: raise ResourceNotFoundError(refresh_token) @@ -1543,6 +1602,7 @@ class CognitoIdpBackend(BaseBackend): } } else: + # We shouldn't get here due to enum validation of auth_flow return None def associate_software_token(self, access_token): diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index c6d518f85..4c44dc079 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -2455,6 +2455,36 @@ def test_authentication_flow(): authentication_flow(conn, auth_flow) +@mock_cognitoidp +def test_authentication_flow_invalid_flow(): + conn = boto3.client("cognito-idp", "us-west-2") + + with pytest.raises(ClientError) as ex: + authentication_flow(conn, "NO_SUCH_FLOW") + + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidParameterException") + err["Message"].should.equal( + "1 validation error detected: Value 'NO_SUCH_FLOW' at 'authFlow' failed to satisfy constraint: " + "Member must satisfy enum value set: " + "['ADMIN_NO_SRP_AUTH', 'ADMIN_USER_PASSWORD_AUTH', 'USER_SRP_AUTH', 'REFRESH_TOKEN_AUTH', 'REFRESH_TOKEN', " + "'CUSTOM_AUTH', 'USER_PASSWORD_AUTH']" + ) + + +@mock_cognitoidp +def test_authentication_flow_invalid_user_flow(): + """ Pass a user authFlow to admin_initiate_auth """ + conn = boto3.client("cognito-idp", "us-west-2") + + with pytest.raises(ClientError) as ex: + authentication_flow(conn, "USER_PASSWORD_AUTH") + + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidParameterException") + err["Message"].should.equal("Initiate Auth method not supported") + + def user_authentication_flow(conn): username = str(uuid.uuid4()) password = str(uuid.uuid4()) @@ -3357,6 +3387,55 @@ def test_initiate_auth_USER_PASSWORD_AUTH(): result["AuthenticationResult"]["RefreshToken"].should_not.be.none +@mock_cognitoidp +def test_initiate_auth_invalid_auth_flow(): + conn = boto3.client("cognito-idp", "us-west-2") + result = user_authentication_flow(conn) + + with pytest.raises(ClientError) as ex: + user_authentication_flow(conn) + + conn.initiate_auth( + ClientId=result["client_id"], + AuthFlow="NO_SUCH_FLOW", + AuthParameters={ + "USERNAME": result["username"], + "PASSWORD": result["password"], + }, + ) + + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidParameterException") + err["Message"].should.equal( + "1 validation error detected: Value 'NO_SUCH_FLOW' at 'authFlow' failed to satisfy constraint: " + "Member must satisfy enum value set: ['ADMIN_NO_SRP_AUTH', 'ADMIN_USER_PASSWORD_AUTH', 'USER_SRP_AUTH', " + "'REFRESH_TOKEN_AUTH', 'REFRESH_TOKEN', 'CUSTOM_AUTH', 'USER_PASSWORD_AUTH']" + ) + + +@mock_cognitoidp +def test_initiate_auth_invalid_admin_auth_flow(): + """ Pass an admin auth_flow to the regular initiate_auth""" + conn = boto3.client("cognito-idp", "us-west-2") + result = user_authentication_flow(conn) + + with pytest.raises(ClientError) as ex: + user_authentication_flow(conn) + + conn.initiate_auth( + ClientId=result["client_id"], + AuthFlow="ADMIN_USER_PASSWORD_AUTH", + AuthParameters={ + "USERNAME": result["username"], + "PASSWORD": result["password"], + }, + ) + + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidParameterException") + err["Message"].should.equal("Initiate Auth method not supported") + + @mock_cognitoidp def test_initiate_auth_USER_PASSWORD_AUTH_with_FORCE_CHANGE_PASSWORD_status(): # Test flow: