added cognito-idp initiate_auth and PASSWORD_VERIFIER challenge to respond_to_auth_challenge (#3260)
* added cognito-idp initiate_auth and PASSWORD_VERIFIER challenge to respond_to_auth_challenge * fixed for python2 * added mfa, REFRESH_TOKEN to initiate_auth, SOFTWARE_TOKEN_MFA to respond_to_auth_challenge * added negative tests * test
This commit is contained in:
parent
bcf61d0b09
commit
236ab59afe
@ -45,6 +45,14 @@ class NotAuthorizedError(BadRequest):
|
||||
)
|
||||
|
||||
|
||||
class UserNotConfirmedException(BadRequest):
|
||||
def __init__(self, message):
|
||||
super(UserNotConfirmedException, self).__init__()
|
||||
self.description = json.dumps(
|
||||
{"message": message, "__type": "UserNotConfirmedException"}
|
||||
)
|
||||
|
||||
|
||||
class InvalidParameterException(JsonRESTError):
|
||||
def __init__(self, msg=None):
|
||||
self.code = 400
|
||||
|
@ -21,13 +21,15 @@ from .exceptions import (
|
||||
ResourceNotFoundError,
|
||||
UserNotFoundError,
|
||||
UsernameExistsException,
|
||||
UserNotConfirmedException,
|
||||
InvalidParameterException,
|
||||
)
|
||||
from .utils import create_id
|
||||
from .utils import create_id, check_secret_hash
|
||||
|
||||
UserStatus = {
|
||||
"FORCE_CHANGE_PASSWORD": "FORCE_CHANGE_PASSWORD",
|
||||
"CONFIRMED": "CONFIRMED",
|
||||
"UNCONFIRMED": "UNCONFIRMED",
|
||||
}
|
||||
|
||||
|
||||
@ -300,6 +302,9 @@ class CognitoIdpUser(BaseModel):
|
||||
self.attributes = attributes
|
||||
self.create_date = datetime.datetime.utcnow()
|
||||
self.last_modified_date = datetime.datetime.utcnow()
|
||||
self.sms_mfa_enabled = False
|
||||
self.software_token_mfa_enabled = False
|
||||
self.token_verified = False
|
||||
|
||||
# Groups this user is a member of.
|
||||
# Note that these links are bidirectional.
|
||||
@ -316,6 +321,11 @@ class CognitoIdpUser(BaseModel):
|
||||
|
||||
# list_users brings back "Attributes" while admin_get_user brings back "UserAttributes".
|
||||
def to_json(self, extended=False, attributes_key="Attributes"):
|
||||
user_mfa_setting_list = []
|
||||
if self.software_token_mfa_enabled:
|
||||
user_mfa_setting_list.append("SOFTWARE_TOKEN_MFA")
|
||||
elif self.sms_mfa_enabled:
|
||||
user_mfa_setting_list.append("SMS_MFA")
|
||||
user_json = self._base_json()
|
||||
if extended:
|
||||
user_json.update(
|
||||
@ -323,6 +333,7 @@ class CognitoIdpUser(BaseModel):
|
||||
"Enabled": self.enabled,
|
||||
attributes_key: self.attributes,
|
||||
"MFAOptions": [],
|
||||
"UserMFASettingList": user_mfa_setting_list,
|
||||
}
|
||||
)
|
||||
|
||||
@ -731,6 +742,9 @@ class CognitoIdpBackend(BaseBackend):
|
||||
def respond_to_auth_challenge(
|
||||
self, session, client_id, challenge_name, challenge_responses
|
||||
):
|
||||
if challenge_name == "PASSWORD_VERIFIER":
|
||||
session = challenge_responses.get("PASSWORD_CLAIM_SECRET_BLOCK")
|
||||
|
||||
user_pool = self.sessions.get(session)
|
||||
if not user_pool:
|
||||
raise ResourceNotFoundError(session)
|
||||
@ -751,6 +765,62 @@ class CognitoIdpBackend(BaseBackend):
|
||||
del self.sessions[session]
|
||||
|
||||
return self._log_user_in(user_pool, client, username)
|
||||
elif challenge_name == "PASSWORD_VERIFIER":
|
||||
username = challenge_responses.get("USERNAME")
|
||||
user = user_pool.users.get(username)
|
||||
if not user:
|
||||
raise UserNotFoundError(username)
|
||||
|
||||
password_claim_signature = challenge_responses.get(
|
||||
"PASSWORD_CLAIM_SIGNATURE"
|
||||
)
|
||||
if not password_claim_signature:
|
||||
raise ResourceNotFoundError(password_claim_signature)
|
||||
password_claim_secret_block = challenge_responses.get(
|
||||
"PASSWORD_CLAIM_SECRET_BLOCK"
|
||||
)
|
||||
if not password_claim_secret_block:
|
||||
raise ResourceNotFoundError(password_claim_secret_block)
|
||||
timestamp = challenge_responses.get("TIMESTAMP")
|
||||
if not timestamp:
|
||||
raise ResourceNotFoundError(timestamp)
|
||||
|
||||
if user.software_token_mfa_enabled:
|
||||
return {
|
||||
"ChallengeName": "SOFTWARE_TOKEN_MFA",
|
||||
"Session": session,
|
||||
"ChallengeParameters": {},
|
||||
}
|
||||
|
||||
if user.sms_mfa_enabled:
|
||||
return {
|
||||
"ChallengeName": "SMS_MFA",
|
||||
"Session": session,
|
||||
"ChallengeParameters": {},
|
||||
}
|
||||
|
||||
del self.sessions[session]
|
||||
return self._log_user_in(user_pool, client, username)
|
||||
elif challenge_name == "SOFTWARE_TOKEN_MFA":
|
||||
username = challenge_responses.get("USERNAME")
|
||||
user = user_pool.users.get(username)
|
||||
if not user:
|
||||
raise UserNotFoundError(username)
|
||||
|
||||
software_token_mfa_code = challenge_responses.get("SOFTWARE_TOKEN_MFA_CODE")
|
||||
if not software_token_mfa_code:
|
||||
raise ResourceNotFoundError(software_token_mfa_code)
|
||||
|
||||
if client.generate_secret:
|
||||
secret_hash = challenge_responses.get("SECRET_HASH")
|
||||
if not check_secret_hash(
|
||||
client.secret, client.id, username, secret_hash
|
||||
):
|
||||
raise NotAuthorizedError(secret_hash)
|
||||
|
||||
del self.sessions[session]
|
||||
return self._log_user_in(user_pool, client, username)
|
||||
|
||||
else:
|
||||
return {}
|
||||
|
||||
@ -806,6 +876,165 @@ class CognitoIdpBackend(BaseBackend):
|
||||
user_pool.resource_servers[identifier] = resource_server
|
||||
return resource_server
|
||||
|
||||
def sign_up(self, client_id, username, password, attributes):
|
||||
user_pool = None
|
||||
for p in self.user_pools.values():
|
||||
if client_id in p.clients:
|
||||
user_pool = p
|
||||
if user_pool is None:
|
||||
raise ResourceNotFoundError(client_id)
|
||||
|
||||
user = CognitoIdpUser(
|
||||
user_pool_id=user_pool.id,
|
||||
username=username,
|
||||
password=password,
|
||||
attributes=attributes,
|
||||
status=UserStatus["UNCONFIRMED"],
|
||||
)
|
||||
user_pool.users[user.username] = user
|
||||
return user
|
||||
|
||||
def confirm_sign_up(self, client_id, username, confirmation_code):
|
||||
user_pool = None
|
||||
for p in self.user_pools.values():
|
||||
if client_id in p.clients:
|
||||
user_pool = p
|
||||
if user_pool is None:
|
||||
raise ResourceNotFoundError(client_id)
|
||||
|
||||
if username not in user_pool.users:
|
||||
raise UserNotFoundError(username)
|
||||
|
||||
user = user_pool.users[username]
|
||||
user.status = UserStatus["CONFIRMED"]
|
||||
return ""
|
||||
|
||||
def initiate_auth(self, client_id, auth_flow, auth_parameters):
|
||||
user_pool = None
|
||||
for p in self.user_pools.values():
|
||||
if client_id in p.clients:
|
||||
user_pool = p
|
||||
if user_pool is None:
|
||||
raise ResourceNotFoundError(client_id)
|
||||
|
||||
client = p.clients.get(client_id)
|
||||
|
||||
if auth_flow == "USER_SRP_AUTH":
|
||||
username = auth_parameters.get("USERNAME")
|
||||
srp_a = auth_parameters.get("SRP_A")
|
||||
if not srp_a:
|
||||
raise ResourceNotFoundError(srp_a)
|
||||
if client.generate_secret:
|
||||
secret_hash = auth_parameters.get("SECRET_HASH")
|
||||
if not check_secret_hash(
|
||||
client.secret, client.id, username, secret_hash
|
||||
):
|
||||
raise NotAuthorizedError(secret_hash)
|
||||
|
||||
user = user_pool.users.get(username)
|
||||
if not user:
|
||||
raise UserNotFoundError(username)
|
||||
|
||||
if user.status == UserStatus["UNCONFIRMED"]:
|
||||
raise UserNotConfirmedException("User is not confirmed.")
|
||||
|
||||
session = str(uuid.uuid4())
|
||||
self.sessions[session] = user_pool
|
||||
|
||||
return {
|
||||
"ChallengeName": "PASSWORD_VERIFIER",
|
||||
"Session": session,
|
||||
"ChallengeParameters": {
|
||||
"SALT": str(uuid.uuid4()),
|
||||
"SRP_B": str(uuid.uuid4()),
|
||||
"USERNAME": user.id,
|
||||
"USER_ID_FOR_SRP": user.id,
|
||||
"SECRET_BLOCK": session,
|
||||
},
|
||||
}
|
||||
elif auth_flow == "REFRESH_TOKEN":
|
||||
refresh_token = auth_parameters.get("REFRESH_TOKEN")
|
||||
if not refresh_token:
|
||||
raise ResourceNotFoundError(refresh_token)
|
||||
|
||||
client_id, username = user_pool.refresh_tokens[refresh_token]
|
||||
if not username:
|
||||
raise ResourceNotFoundError(username)
|
||||
|
||||
if client.generate_secret:
|
||||
secret_hash = auth_parameters.get("SECRET_HASH")
|
||||
if not check_secret_hash(
|
||||
client.secret, client.id, username, secret_hash
|
||||
):
|
||||
raise NotAuthorizedError(secret_hash)
|
||||
|
||||
(
|
||||
id_token,
|
||||
access_token,
|
||||
expires_in,
|
||||
) = user_pool.create_tokens_from_refresh_token(refresh_token)
|
||||
|
||||
return {
|
||||
"AuthenticationResult": {
|
||||
"IdToken": id_token,
|
||||
"AccessToken": access_token,
|
||||
"ExpiresIn": expires_in,
|
||||
}
|
||||
}
|
||||
else:
|
||||
return None
|
||||
|
||||
def associate_software_token(self, access_token):
|
||||
for user_pool in self.user_pools.values():
|
||||
if access_token in user_pool.access_tokens:
|
||||
_, username = user_pool.access_tokens[access_token]
|
||||
user = user_pool.users.get(username)
|
||||
if not user:
|
||||
raise UserNotFoundError(username)
|
||||
|
||||
return {"SecretCode": str(uuid.uuid4())}
|
||||
else:
|
||||
raise NotAuthorizedError(access_token)
|
||||
|
||||
def verify_software_token(self, access_token, user_code):
|
||||
for user_pool in self.user_pools.values():
|
||||
if access_token in user_pool.access_tokens:
|
||||
_, username = user_pool.access_tokens[access_token]
|
||||
user = user_pool.users.get(username)
|
||||
if not user:
|
||||
raise UserNotFoundError(username)
|
||||
|
||||
user.token_verified = True
|
||||
|
||||
return {"Status": "SUCCESS"}
|
||||
else:
|
||||
raise NotAuthorizedError(access_token)
|
||||
|
||||
def set_user_mfa_preference(
|
||||
self, access_token, software_token_mfa_settings, sms_mfa_settings
|
||||
):
|
||||
for user_pool in self.user_pools.values():
|
||||
if access_token in user_pool.access_tokens:
|
||||
_, username = user_pool.access_tokens[access_token]
|
||||
user = user_pool.users.get(username)
|
||||
if not user:
|
||||
raise UserNotFoundError(username)
|
||||
|
||||
if software_token_mfa_settings["Enabled"]:
|
||||
if user.token_verified:
|
||||
user.software_token_mfa_enabled = True
|
||||
else:
|
||||
raise InvalidParameterException(
|
||||
"User has not verified software token mfa"
|
||||
)
|
||||
|
||||
elif sms_mfa_settings["Enabled"]:
|
||||
user.sms_mfa_enabled = True
|
||||
|
||||
return None
|
||||
else:
|
||||
raise NotAuthorizedError(access_token)
|
||||
|
||||
|
||||
cognitoidp_backends = {}
|
||||
for region in Session().get_available_regions("cognito-idp"):
|
||||
|
@ -4,7 +4,7 @@ import json
|
||||
import os
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from .models import cognitoidp_backends, find_region_by_value
|
||||
from .models import cognitoidp_backends, find_region_by_value, UserStatus
|
||||
|
||||
|
||||
class CognitoIdpResponse(BaseResponse):
|
||||
@ -390,6 +390,65 @@ class CognitoIdpResponse(BaseResponse):
|
||||
)
|
||||
return json.dumps({"ResourceServer": resource_server.to_json()})
|
||||
|
||||
def sign_up(self):
|
||||
client_id = self._get_param("ClientId")
|
||||
username = self._get_param("Username")
|
||||
password = self._get_param("Password")
|
||||
user = cognitoidp_backends[self.region].sign_up(
|
||||
client_id=client_id,
|
||||
username=username,
|
||||
password=password,
|
||||
attributes=self._get_param("UserAttributes", []),
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
"UserConfirmed": user.status == UserStatus["CONFIRMED"],
|
||||
"UserSub": user.id,
|
||||
}
|
||||
)
|
||||
|
||||
def confirm_sign_up(self):
|
||||
client_id = self._get_param("ClientId")
|
||||
username = self._get_param("Username")
|
||||
confirmation_code = self._get_param("ConfirmationCode")
|
||||
cognitoidp_backends[self.region].confirm_sign_up(
|
||||
client_id=client_id, username=username, confirmation_code=confirmation_code,
|
||||
)
|
||||
return ""
|
||||
|
||||
def initiate_auth(self):
|
||||
client_id = self._get_param("ClientId")
|
||||
auth_flow = self._get_param("AuthFlow")
|
||||
auth_parameters = self._get_param("AuthParameters")
|
||||
|
||||
auth_result = cognitoidp_backends[self.region].initiate_auth(
|
||||
client_id, auth_flow, auth_parameters
|
||||
)
|
||||
|
||||
return json.dumps(auth_result)
|
||||
|
||||
def associate_software_token(self):
|
||||
access_token = self._get_param("AccessToken")
|
||||
result = cognitoidp_backends[self.region].associate_software_token(access_token)
|
||||
return json.dumps(result)
|
||||
|
||||
def verify_software_token(self):
|
||||
access_token = self._get_param("AccessToken")
|
||||
user_code = self._get_param("UserCode")
|
||||
result = cognitoidp_backends[self.region].verify_software_token(
|
||||
access_token, user_code
|
||||
)
|
||||
return json.dumps(result)
|
||||
|
||||
def set_user_mfa_preference(self):
|
||||
access_token = self._get_param("AccessToken")
|
||||
software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings")
|
||||
sms_mfa_settings = self._get_param("SMSMfaSettings")
|
||||
cognitoidp_backends[self.region].set_user_mfa_preference(
|
||||
access_token, software_token_mfa_settings, sms_mfa_settings
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
class CognitoIdpJsonWebKeyResponse(BaseResponse):
|
||||
def __init__(self):
|
||||
|
@ -2,9 +2,20 @@ from __future__ import unicode_literals
|
||||
import six
|
||||
import random
|
||||
import string
|
||||
import hashlib
|
||||
import hmac
|
||||
import base64
|
||||
|
||||
|
||||
def create_id():
|
||||
size = 26
|
||||
chars = list(range(10)) + list(string.ascii_lowercase)
|
||||
return "".join(six.text_type(random.choice(chars)) for x in range(size))
|
||||
|
||||
|
||||
def check_secret_hash(app_client_secret, app_client_id, username, secret_hash):
|
||||
key = bytes(str(app_client_secret).encode("latin-1"))
|
||||
msg = bytes(str(username + app_client_id).encode("latin-1"))
|
||||
new_digest = hmac.new(key, msg, hashlib.sha256).digest()
|
||||
SECRET_HASH = base64.b64encode(new_digest).decode()
|
||||
return SECRET_HASH == secret_hash
|
||||
|
@ -4,6 +4,9 @@ import json
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import hmac
|
||||
import hashlib
|
||||
import base64
|
||||
|
||||
import requests
|
||||
import uuid
|
||||
@ -1248,6 +1251,137 @@ def test_authentication_flow():
|
||||
authentication_flow(conn)
|
||||
|
||||
|
||||
def user_authentication_flow(conn):
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
user_attribute_name = str(uuid.uuid4())
|
||||
user_attribute_value = str(uuid.uuid4())
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id,
|
||||
ClientName=str(uuid.uuid4()),
|
||||
ReadAttributes=[user_attribute_name],
|
||||
GenerateSecret=True,
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
|
||||
conn.sign_up(
|
||||
ClientId=client_id, Username=username, Password=password,
|
||||
)
|
||||
|
||||
client_secret = conn.describe_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientId=client_id,
|
||||
)["UserPoolClient"]["ClientSecret"]
|
||||
|
||||
conn.confirm_sign_up(
|
||||
ClientId=client_id, Username=username, ConfirmationCode="123456",
|
||||
)
|
||||
|
||||
# generating secret hash
|
||||
key = bytes(str(client_secret).encode("latin-1"))
|
||||
msg = bytes(str(username + client_id).encode("latin-1"))
|
||||
new_digest = hmac.new(key, msg, hashlib.sha256).digest()
|
||||
secret_hash = base64.b64encode(new_digest).decode()
|
||||
|
||||
result = conn.initiate_auth(
|
||||
ClientId=client_id,
|
||||
AuthFlow="USER_SRP_AUTH",
|
||||
AuthParameters={
|
||||
"USERNAME": username,
|
||||
"SRP_A": str(uuid.uuid4()),
|
||||
"SECRET_HASH": secret_hash,
|
||||
},
|
||||
)
|
||||
|
||||
result = conn.respond_to_auth_challenge(
|
||||
ClientId=client_id,
|
||||
ChallengeName=result["ChallengeName"],
|
||||
ChallengeResponses={
|
||||
"PASSWORD_CLAIM_SIGNATURE": str(uuid.uuid4()),
|
||||
"PASSWORD_CLAIM_SECRET_BLOCK": result["Session"],
|
||||
"TIMESTAMP": str(uuid.uuid4()),
|
||||
"USERNAME": username,
|
||||
},
|
||||
)
|
||||
|
||||
refresh_token = result["AuthenticationResult"]["RefreshToken"]
|
||||
|
||||
# add mfa token
|
||||
conn.associate_software_token(
|
||||
AccessToken=result["AuthenticationResult"]["AccessToken"],
|
||||
)
|
||||
|
||||
conn.verify_software_token(
|
||||
AccessToken=result["AuthenticationResult"]["AccessToken"], UserCode="123456",
|
||||
)
|
||||
|
||||
conn.set_user_mfa_preference(
|
||||
AccessToken=result["AuthenticationResult"]["AccessToken"],
|
||||
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True,},
|
||||
)
|
||||
|
||||
result = conn.initiate_auth(
|
||||
ClientId=client_id,
|
||||
AuthFlow="REFRESH_TOKEN",
|
||||
AuthParameters={"SECRET_HASH": secret_hash, "REFRESH_TOKEN": refresh_token,},
|
||||
)
|
||||
|
||||
result["AuthenticationResult"]["IdToken"].should_not.be.none
|
||||
result["AuthenticationResult"]["AccessToken"].should_not.be.none
|
||||
|
||||
# authenticate user once again this time with mfa token
|
||||
result = conn.initiate_auth(
|
||||
ClientId=client_id,
|
||||
AuthFlow="USER_SRP_AUTH",
|
||||
AuthParameters={
|
||||
"USERNAME": username,
|
||||
"SRP_A": str(uuid.uuid4()),
|
||||
"SECRET_HASH": secret_hash,
|
||||
},
|
||||
)
|
||||
|
||||
result = conn.respond_to_auth_challenge(
|
||||
ClientId=client_id,
|
||||
ChallengeName=result["ChallengeName"],
|
||||
ChallengeResponses={
|
||||
"PASSWORD_CLAIM_SIGNATURE": str(uuid.uuid4()),
|
||||
"PASSWORD_CLAIM_SECRET_BLOCK": result["Session"],
|
||||
"TIMESTAMP": str(uuid.uuid4()),
|
||||
"USERNAME": username,
|
||||
},
|
||||
)
|
||||
|
||||
result = conn.respond_to_auth_challenge(
|
||||
ClientId=client_id,
|
||||
Session=result["Session"],
|
||||
ChallengeName=result["ChallengeName"],
|
||||
ChallengeResponses={
|
||||
"SOFTWARE_TOKEN_MFA_CODE": "123456",
|
||||
"USERNAME": username,
|
||||
"SECRET_HASH": secret_hash,
|
||||
},
|
||||
)
|
||||
|
||||
return {
|
||||
"user_pool_id": user_pool_id,
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
"secret_hash": secret_hash,
|
||||
"id_token": result["AuthenticationResult"]["IdToken"],
|
||||
"access_token": result["AuthenticationResult"]["AccessToken"],
|
||||
"refresh_token": refresh_token,
|
||||
"username": username,
|
||||
"password": password,
|
||||
"additional_fields": {user_attribute_name: user_attribute_value},
|
||||
}
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_user_authentication_flow():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
user_authentication_flow(conn)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_token_legitimacy():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
@ -1437,6 +1571,244 @@ def test_resource_server():
|
||||
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_sign_up():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()),
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
result = conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
result["UserConfirmed"].should.be.false
|
||||
result["UserSub"].should_not.be.none
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_confirm_sign_up():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
|
||||
conn.confirm_sign_up(
|
||||
ClientId=client_id, Username=username, ConfirmationCode="123456",
|
||||
)
|
||||
|
||||
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
|
||||
result["UserStatus"].should.equal("CONFIRMED")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_initiate_auth_USER_SRP_AUTH():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
client_secret = conn.describe_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientId=client_id,
|
||||
)["UserPoolClient"]["ClientSecret"]
|
||||
conn.confirm_sign_up(
|
||||
ClientId=client_id, Username=username, ConfirmationCode="123456",
|
||||
)
|
||||
|
||||
key = bytes(str(client_secret).encode("latin-1"))
|
||||
msg = bytes(str(username + client_id).encode("latin-1"))
|
||||
new_digest = hmac.new(key, msg, hashlib.sha256).digest()
|
||||
secret_hash = base64.b64encode(new_digest).decode()
|
||||
|
||||
result = conn.initiate_auth(
|
||||
ClientId=client_id,
|
||||
AuthFlow="USER_SRP_AUTH",
|
||||
AuthParameters={
|
||||
"USERNAME": username,
|
||||
"SRP_A": str(uuid.uuid4()),
|
||||
"SECRET_HASH": secret_hash,
|
||||
},
|
||||
)
|
||||
|
||||
result["ChallengeName"].should.equal("PASSWORD_VERIFIER")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_initiate_auth_REFRESH_TOKEN():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
result = user_authentication_flow(conn)
|
||||
result = conn.initiate_auth(
|
||||
ClientId=result["client_id"],
|
||||
AuthFlow="REFRESH_TOKEN",
|
||||
AuthParameters={
|
||||
"REFRESH_TOKEN": result["refresh_token"],
|
||||
"SECRET_HASH": result["secret_hash"],
|
||||
},
|
||||
)
|
||||
|
||||
result["AuthenticationResult"]["AccessToken"].should_not.be.none
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_initiate_auth_for_unconfirmed_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
client_secret = conn.describe_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientId=client_id,
|
||||
)["UserPoolClient"]["ClientSecret"]
|
||||
|
||||
key = bytes(str(client_secret).encode("latin-1"))
|
||||
msg = bytes(str(username + client_id).encode("latin-1"))
|
||||
new_digest = hmac.new(key, msg, hashlib.sha256).digest()
|
||||
secret_hash = base64.b64encode(new_digest).decode()
|
||||
|
||||
caught = False
|
||||
try:
|
||||
result = conn.initiate_auth(
|
||||
ClientId=client_id,
|
||||
AuthFlow="USER_SRP_AUTH",
|
||||
AuthParameters={
|
||||
"USERNAME": username,
|
||||
"SRP_A": str(uuid.uuid4()),
|
||||
"SECRET_HASH": secret_hash,
|
||||
},
|
||||
)
|
||||
except conn.exceptions.UserNotConfirmedException:
|
||||
caught = True
|
||||
|
||||
caught.should.be.true
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_initiate_auth_with_invalid_secret_hash():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
username = str(uuid.uuid4())
|
||||
password = str(uuid.uuid4())
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
client_id = conn.create_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
|
||||
)["UserPoolClient"]["ClientId"]
|
||||
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||
client_secret = conn.describe_user_pool_client(
|
||||
UserPoolId=user_pool_id, ClientId=client_id,
|
||||
)["UserPoolClient"]["ClientSecret"]
|
||||
conn.confirm_sign_up(
|
||||
ClientId=client_id, Username=username, ConfirmationCode="123456",
|
||||
)
|
||||
|
||||
invalid_secret_hash = str(uuid.uuid4())
|
||||
|
||||
caught = False
|
||||
try:
|
||||
result = conn.initiate_auth(
|
||||
ClientId=client_id,
|
||||
AuthFlow="USER_SRP_AUTH",
|
||||
AuthParameters={
|
||||
"USERNAME": username,
|
||||
"SRP_A": str(uuid.uuid4()),
|
||||
"SECRET_HASH": invalid_secret_hash,
|
||||
},
|
||||
)
|
||||
except conn.exceptions.NotAuthorizedException:
|
||||
caught = True
|
||||
|
||||
caught.should.be.true
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_setting_mfa():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
result = authentication_flow(conn)
|
||||
conn.associate_software_token(AccessToken=result["access_token"])
|
||||
conn.verify_software_token(AccessToken=result["access_token"], UserCode="123456")
|
||||
conn.set_user_mfa_preference(
|
||||
AccessToken=result["access_token"],
|
||||
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
|
||||
)
|
||||
result = conn.admin_get_user(
|
||||
UserPoolId=result["user_pool_id"], Username=result["username"]
|
||||
)
|
||||
|
||||
result["UserMFASettingList"].should.have.length_of(1)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_setting_mfa_when_token_not_verified():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
result = authentication_flow(conn)
|
||||
conn.associate_software_token(AccessToken=result["access_token"])
|
||||
|
||||
caught = False
|
||||
try:
|
||||
conn.set_user_mfa_preference(
|
||||
AccessToken=result["access_token"],
|
||||
SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True},
|
||||
)
|
||||
except conn.exceptions.InvalidParameterException:
|
||||
caught = True
|
||||
|
||||
caught.should.be.true
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_respond_to_auth_challenge_with_invalid_secret_hash():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
result = user_authentication_flow(conn)
|
||||
|
||||
valid_secret_hash = result["secret_hash"]
|
||||
invalid_secret_hash = str(uuid.uuid4())
|
||||
|
||||
challenge = conn.initiate_auth(
|
||||
ClientId=result["client_id"],
|
||||
AuthFlow="USER_SRP_AUTH",
|
||||
AuthParameters={
|
||||
"USERNAME": result["username"],
|
||||
"SRP_A": str(uuid.uuid4()),
|
||||
"SECRET_HASH": valid_secret_hash,
|
||||
},
|
||||
)
|
||||
|
||||
challenge = conn.respond_to_auth_challenge(
|
||||
ClientId=result["client_id"],
|
||||
ChallengeName=challenge["ChallengeName"],
|
||||
ChallengeResponses={
|
||||
"PASSWORD_CLAIM_SIGNATURE": str(uuid.uuid4()),
|
||||
"PASSWORD_CLAIM_SECRET_BLOCK": challenge["Session"],
|
||||
"TIMESTAMP": str(uuid.uuid4()),
|
||||
"USERNAME": result["username"],
|
||||
},
|
||||
)
|
||||
|
||||
caught = False
|
||||
try:
|
||||
conn.respond_to_auth_challenge(
|
||||
ClientId=result["client_id"],
|
||||
Session=challenge["Session"],
|
||||
ChallengeName=challenge["ChallengeName"],
|
||||
ChallengeResponses={
|
||||
"SOFTWARE_TOKEN_MFA_CODE": "123456",
|
||||
"USERNAME": result["username"],
|
||||
"SECRET_HASH": invalid_secret_hash,
|
||||
},
|
||||
)
|
||||
except conn.exceptions.NotAuthorizedException:
|
||||
caught = True
|
||||
|
||||
caught.should.be.true
|
||||
|
||||
|
||||
# Test will retrieve public key from cognito.amazonaws.com/.well-known/jwks.json,
|
||||
# which isnt mocked in ServerMode
|
||||
if not settings.TEST_SERVER_MODE:
|
||||
|
Loading…
Reference in New Issue
Block a user