Restrict initiate_auth and admin_initiate_auth AuthFlow (#4836)

This commit is contained in:
Alastair McCormack 2022-02-07 23:55:51 +00:00 committed by GitHub
parent c25aae0d9a
commit 1ab00b48d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 145 additions and 6 deletions

View File

@ -3,6 +3,7 @@ import hashlib
import json
import os
import time
import typing
import uuid
import enum
import random
@ -39,6 +40,21 @@ class UserStatus(str, enum.Enum):
RESET_REQUIRED = "RESET_REQUIRED"
class AuthFlow(str, enum.Enum):
# Order follows AWS' order
ADMIN_NO_SRP_AUTH = "ADMIN_NO_SRP_AUTH"
ADMIN_USER_PASSWORD_AUTH = "ADMIN_USER_PASSWORD_AUTH"
USER_SRP_AUTH = "USER_SRP_AUTH"
REFRESH_TOKEN_AUTH = "REFRESH_TOKEN_AUTH"
REFRESH_TOKEN = "REFRESH_TOKEN"
CUSTOM_AUTH = "CUSTOM_AUTH"
USER_PASSWORD_AUTH = "USER_PASSWORD_AUTH"
@classmethod
def list(cls):
return [e.value for e in cls]
class CognitoIdpUserPoolAttribute(BaseModel):
STANDARD_SCHEMA = {
@ -1110,14 +1126,43 @@ class CognitoIdpBackend(BaseBackend):
}
}
def _validate_auth_flow(
self, auth_flow: str, valid_flows: typing.List[AuthFlow]
) -> AuthFlow:
""" validate auth_flow value and convert auth_flow to enum """
try:
auth_flow = AuthFlow[auth_flow]
except KeyError:
raise InvalidParameterException(
f"1 validation error detected: Value '{auth_flow}' at 'authFlow' failed to satisfy constraint: "
f"Member must satisfy enum value set: "
f"{AuthFlow.list()}"
)
if auth_flow not in valid_flows:
raise InvalidParameterException("Initiate Auth method not supported")
return auth_flow
def admin_initiate_auth(self, user_pool_id, client_id, auth_flow, auth_parameters):
admin_auth_flows = [
AuthFlow.ADMIN_NO_SRP_AUTH,
AuthFlow.ADMIN_USER_PASSWORD_AUTH,
AuthFlow.REFRESH_TOKEN_AUTH,
AuthFlow.REFRESH_TOKEN,
]
auth_flow = self._validate_auth_flow(
auth_flow=auth_flow, valid_flows=admin_auth_flows
)
user_pool = self.describe_user_pool(user_pool_id)
client = user_pool.clients.get(client_id)
if not client:
raise ResourceNotFoundError(client_id)
if auth_flow in ("ADMIN_USER_PASSWORD_AUTH", "ADMIN_NO_SRP_AUTH"):
if auth_flow in (AuthFlow.ADMIN_USER_PASSWORD_AUTH, AuthFlow.ADMIN_NO_SRP_AUTH):
username = auth_parameters.get("USERNAME")
password = auth_parameters.get("PASSWORD")
user = self.admin_get_user(user_pool_id, username)
@ -1139,7 +1184,7 @@ class CognitoIdpBackend(BaseBackend):
}
return self._log_user_in(user_pool, client, username)
elif auth_flow == "REFRESH_TOKEN":
elif auth_flow is AuthFlow.REFRESH_TOKEN:
refresh_token = auth_parameters.get("REFRESH_TOKEN")
(
id_token,
@ -1152,10 +1197,12 @@ class CognitoIdpBackend(BaseBackend):
"IdToken": id_token,
"AccessToken": access_token,
"ExpiresIn": expires_in,
"TokenType": "Bearer",
}
}
else:
return {}
# We shouldn't get here due to enum validation of auth_flow
return None
def respond_to_auth_challenge(
self, session, client_id, challenge_name, challenge_responses
@ -1429,6 +1476,18 @@ class CognitoIdpBackend(BaseBackend):
return ""
def initiate_auth(self, client_id, auth_flow, auth_parameters):
user_auth_flows = [
AuthFlow.USER_SRP_AUTH,
AuthFlow.REFRESH_TOKEN_AUTH,
AuthFlow.REFRESH_TOKEN,
AuthFlow.CUSTOM_AUTH,
AuthFlow.USER_PASSWORD_AUTH,
]
auth_flow = self._validate_auth_flow(
auth_flow=auth_flow, valid_flows=user_auth_flows
)
user_pool = None
for p in self.user_pools.values():
if client_id in p.clients:
@ -1438,7 +1497,7 @@ class CognitoIdpBackend(BaseBackend):
client = p.clients.get(client_id)
if auth_flow == "USER_SRP_AUTH":
if auth_flow is AuthFlow.USER_SRP_AUTH:
username = auth_parameters.get("USERNAME")
srp_a = auth_parameters.get("SRP_A")
if not srp_a:
@ -1469,7 +1528,7 @@ class CognitoIdpBackend(BaseBackend):
"SECRET_BLOCK": session,
},
}
elif auth_flow == "USER_PASSWORD_AUTH":
elif auth_flow is AuthFlow.USER_PASSWORD_AUTH:
username = auth_parameters.get("USERNAME")
password = auth_parameters.get("PASSWORD")
@ -1509,7 +1568,7 @@ class CognitoIdpBackend(BaseBackend):
"TokenType": "Bearer",
}
}
elif auth_flow in ("REFRESH_TOKEN", "REFRESH_TOKEN_AUTH"):
elif auth_flow in (AuthFlow.REFRESH_TOKEN, AuthFlow.REFRESH_TOKEN_AUTH):
refresh_token = auth_parameters.get("REFRESH_TOKEN")
if not refresh_token:
raise ResourceNotFoundError(refresh_token)
@ -1543,6 +1602,7 @@ class CognitoIdpBackend(BaseBackend):
}
}
else:
# We shouldn't get here due to enum validation of auth_flow
return None
def associate_software_token(self, access_token):

View File

@ -2455,6 +2455,36 @@ def test_authentication_flow():
authentication_flow(conn, auth_flow)
@mock_cognitoidp
def test_authentication_flow_invalid_flow():
conn = boto3.client("cognito-idp", "us-west-2")
with pytest.raises(ClientError) as ex:
authentication_flow(conn, "NO_SUCH_FLOW")
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterException")
err["Message"].should.equal(
"1 validation error detected: Value 'NO_SUCH_FLOW' at 'authFlow' failed to satisfy constraint: "
"Member must satisfy enum value set: "
"['ADMIN_NO_SRP_AUTH', 'ADMIN_USER_PASSWORD_AUTH', 'USER_SRP_AUTH', 'REFRESH_TOKEN_AUTH', 'REFRESH_TOKEN', "
"'CUSTOM_AUTH', 'USER_PASSWORD_AUTH']"
)
@mock_cognitoidp
def test_authentication_flow_invalid_user_flow():
""" Pass a user authFlow to admin_initiate_auth """
conn = boto3.client("cognito-idp", "us-west-2")
with pytest.raises(ClientError) as ex:
authentication_flow(conn, "USER_PASSWORD_AUTH")
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterException")
err["Message"].should.equal("Initiate Auth method not supported")
def user_authentication_flow(conn):
username = str(uuid.uuid4())
password = str(uuid.uuid4())
@ -3357,6 +3387,55 @@ def test_initiate_auth_USER_PASSWORD_AUTH():
result["AuthenticationResult"]["RefreshToken"].should_not.be.none
@mock_cognitoidp
def test_initiate_auth_invalid_auth_flow():
conn = boto3.client("cognito-idp", "us-west-2")
result = user_authentication_flow(conn)
with pytest.raises(ClientError) as ex:
user_authentication_flow(conn)
conn.initiate_auth(
ClientId=result["client_id"],
AuthFlow="NO_SUCH_FLOW",
AuthParameters={
"USERNAME": result["username"],
"PASSWORD": result["password"],
},
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterException")
err["Message"].should.equal(
"1 validation error detected: Value 'NO_SUCH_FLOW' at 'authFlow' failed to satisfy constraint: "
"Member must satisfy enum value set: ['ADMIN_NO_SRP_AUTH', 'ADMIN_USER_PASSWORD_AUTH', 'USER_SRP_AUTH', "
"'REFRESH_TOKEN_AUTH', 'REFRESH_TOKEN', 'CUSTOM_AUTH', 'USER_PASSWORD_AUTH']"
)
@mock_cognitoidp
def test_initiate_auth_invalid_admin_auth_flow():
""" Pass an admin auth_flow to the regular initiate_auth"""
conn = boto3.client("cognito-idp", "us-west-2")
result = user_authentication_flow(conn)
with pytest.raises(ClientError) as ex:
user_authentication_flow(conn)
conn.initiate_auth(
ClientId=result["client_id"],
AuthFlow="ADMIN_USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": result["username"],
"PASSWORD": result["password"],
},
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidParameterException")
err["Message"].should.equal("Initiate Auth method not supported")
@mock_cognitoidp
def test_initiate_auth_USER_PASSWORD_AUTH_with_FORCE_CHANGE_PASSWORD_status():
# Test flow: