From 236ab59afeb94a5bffa1447cab3404ae9c32aee7 Mon Sep 17 00:00:00 2001 From: xsphrx <34844540+xsphrx@users.noreply.github.com> Date: Tue, 1 Sep 2020 10:20:31 +0200 Subject: [PATCH] added cognito-idp initiate_auth and PASSWORD_VERIFIER challenge to respond_to_auth_challenge (#3260) * added cognito-idp initiate_auth and PASSWORD_VERIFIER challenge to respond_to_auth_challenge * fixed for python2 * added mfa, REFRESH_TOKEN to initiate_auth, SOFTWARE_TOKEN_MFA to respond_to_auth_challenge * added negative tests * test --- moto/cognitoidp/exceptions.py | 8 + moto/cognitoidp/models.py | 231 +++++++++++++- moto/cognitoidp/responses.py | 61 +++- moto/cognitoidp/utils.py | 11 + tests/test_cognitoidp/test_cognitoidp.py | 372 +++++++++++++++++++++++ 5 files changed, 681 insertions(+), 2 deletions(-) diff --git a/moto/cognitoidp/exceptions.py b/moto/cognitoidp/exceptions.py index c9b6368ca..baf5f6526 100644 --- a/moto/cognitoidp/exceptions.py +++ b/moto/cognitoidp/exceptions.py @@ -45,6 +45,14 @@ class NotAuthorizedError(BadRequest): ) +class UserNotConfirmedException(BadRequest): + def __init__(self, message): + super(UserNotConfirmedException, self).__init__() + self.description = json.dumps( + {"message": message, "__type": "UserNotConfirmedException"} + ) + + class InvalidParameterException(JsonRESTError): def __init__(self, msg=None): self.code = 400 diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index a3cb69084..bfa7177f1 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -21,13 +21,15 @@ from .exceptions import ( ResourceNotFoundError, UserNotFoundError, UsernameExistsException, + UserNotConfirmedException, InvalidParameterException, ) -from .utils import create_id +from .utils import create_id, check_secret_hash UserStatus = { "FORCE_CHANGE_PASSWORD": "FORCE_CHANGE_PASSWORD", "CONFIRMED": "CONFIRMED", + "UNCONFIRMED": "UNCONFIRMED", } @@ -300,6 +302,9 @@ class CognitoIdpUser(BaseModel): self.attributes = attributes self.create_date = datetime.datetime.utcnow() self.last_modified_date = datetime.datetime.utcnow() + self.sms_mfa_enabled = False + self.software_token_mfa_enabled = False + self.token_verified = False # Groups this user is a member of. # Note that these links are bidirectional. @@ -316,6 +321,11 @@ class CognitoIdpUser(BaseModel): # list_users brings back "Attributes" while admin_get_user brings back "UserAttributes". def to_json(self, extended=False, attributes_key="Attributes"): + user_mfa_setting_list = [] + if self.software_token_mfa_enabled: + user_mfa_setting_list.append("SOFTWARE_TOKEN_MFA") + elif self.sms_mfa_enabled: + user_mfa_setting_list.append("SMS_MFA") user_json = self._base_json() if extended: user_json.update( @@ -323,6 +333,7 @@ class CognitoIdpUser(BaseModel): "Enabled": self.enabled, attributes_key: self.attributes, "MFAOptions": [], + "UserMFASettingList": user_mfa_setting_list, } ) @@ -731,6 +742,9 @@ class CognitoIdpBackend(BaseBackend): def respond_to_auth_challenge( self, session, client_id, challenge_name, challenge_responses ): + if challenge_name == "PASSWORD_VERIFIER": + session = challenge_responses.get("PASSWORD_CLAIM_SECRET_BLOCK") + user_pool = self.sessions.get(session) if not user_pool: raise ResourceNotFoundError(session) @@ -751,6 +765,62 @@ class CognitoIdpBackend(BaseBackend): del self.sessions[session] return self._log_user_in(user_pool, client, username) + elif challenge_name == "PASSWORD_VERIFIER": + username = challenge_responses.get("USERNAME") + user = user_pool.users.get(username) + if not user: + raise UserNotFoundError(username) + + password_claim_signature = challenge_responses.get( + "PASSWORD_CLAIM_SIGNATURE" + ) + if not password_claim_signature: + raise ResourceNotFoundError(password_claim_signature) + password_claim_secret_block = challenge_responses.get( + "PASSWORD_CLAIM_SECRET_BLOCK" + ) + if not password_claim_secret_block: + raise ResourceNotFoundError(password_claim_secret_block) + timestamp = challenge_responses.get("TIMESTAMP") + if not timestamp: + raise ResourceNotFoundError(timestamp) + + if user.software_token_mfa_enabled: + return { + "ChallengeName": "SOFTWARE_TOKEN_MFA", + "Session": session, + "ChallengeParameters": {}, + } + + if user.sms_mfa_enabled: + return { + "ChallengeName": "SMS_MFA", + "Session": session, + "ChallengeParameters": {}, + } + + del self.sessions[session] + return self._log_user_in(user_pool, client, username) + elif challenge_name == "SOFTWARE_TOKEN_MFA": + username = challenge_responses.get("USERNAME") + user = user_pool.users.get(username) + if not user: + raise UserNotFoundError(username) + + software_token_mfa_code = challenge_responses.get("SOFTWARE_TOKEN_MFA_CODE") + if not software_token_mfa_code: + raise ResourceNotFoundError(software_token_mfa_code) + + if client.generate_secret: + secret_hash = challenge_responses.get("SECRET_HASH") + if not check_secret_hash( + client.secret, client.id, username, secret_hash + ): + raise NotAuthorizedError(secret_hash) + + del self.sessions[session] + return self._log_user_in(user_pool, client, username) + else: return {} @@ -806,6 +876,165 @@ class CognitoIdpBackend(BaseBackend): user_pool.resource_servers[identifier] = resource_server return resource_server + def sign_up(self, client_id, username, password, attributes): + user_pool = None + for p in self.user_pools.values(): + if client_id in p.clients: + user_pool = p + if user_pool is None: + raise ResourceNotFoundError(client_id) + + user = CognitoIdpUser( + user_pool_id=user_pool.id, + username=username, + password=password, + attributes=attributes, + status=UserStatus["UNCONFIRMED"], + ) + user_pool.users[user.username] = user + return user + + def confirm_sign_up(self, client_id, username, confirmation_code): + user_pool = None + for p in self.user_pools.values(): + if client_id in p.clients: + user_pool = p + if user_pool is None: + raise ResourceNotFoundError(client_id) + + if username not in user_pool.users: + raise UserNotFoundError(username) + + user = user_pool.users[username] + user.status = UserStatus["CONFIRMED"] + return "" + + def initiate_auth(self, client_id, auth_flow, auth_parameters): + user_pool = None + for p in self.user_pools.values(): + if client_id in p.clients: + user_pool = p + if user_pool is None: + raise ResourceNotFoundError(client_id) + + client = p.clients.get(client_id) + + if auth_flow == "USER_SRP_AUTH": + username = auth_parameters.get("USERNAME") + srp_a = auth_parameters.get("SRP_A") + if not srp_a: + raise ResourceNotFoundError(srp_a) + if client.generate_secret: + secret_hash = auth_parameters.get("SECRET_HASH") + if not check_secret_hash( + client.secret, client.id, username, secret_hash + ): + raise NotAuthorizedError(secret_hash) + + user = user_pool.users.get(username) + if not user: + raise UserNotFoundError(username) + + if user.status == UserStatus["UNCONFIRMED"]: + raise UserNotConfirmedException("User is not confirmed.") + + session = str(uuid.uuid4()) + self.sessions[session] = user_pool + + return { + "ChallengeName": "PASSWORD_VERIFIER", + "Session": session, + "ChallengeParameters": { + "SALT": str(uuid.uuid4()), + "SRP_B": str(uuid.uuid4()), + "USERNAME": user.id, + "USER_ID_FOR_SRP": user.id, + "SECRET_BLOCK": session, + }, + } + elif auth_flow == "REFRESH_TOKEN": + refresh_token = auth_parameters.get("REFRESH_TOKEN") + if not refresh_token: + raise ResourceNotFoundError(refresh_token) + + client_id, username = user_pool.refresh_tokens[refresh_token] + if not username: + raise ResourceNotFoundError(username) + + if client.generate_secret: + secret_hash = auth_parameters.get("SECRET_HASH") + if not check_secret_hash( + client.secret, client.id, username, secret_hash + ): + raise NotAuthorizedError(secret_hash) + + ( + id_token, + access_token, + expires_in, + ) = user_pool.create_tokens_from_refresh_token(refresh_token) + + return { + "AuthenticationResult": { + "IdToken": id_token, + "AccessToken": access_token, + "ExpiresIn": expires_in, + } + } + else: + return None + + def associate_software_token(self, access_token): + for user_pool in self.user_pools.values(): + if access_token in user_pool.access_tokens: + _, username = user_pool.access_tokens[access_token] + user = user_pool.users.get(username) + if not user: + raise UserNotFoundError(username) + + return {"SecretCode": str(uuid.uuid4())} + else: + raise NotAuthorizedError(access_token) + + def verify_software_token(self, access_token, user_code): + for user_pool in self.user_pools.values(): + if access_token in user_pool.access_tokens: + _, username = user_pool.access_tokens[access_token] + user = user_pool.users.get(username) + if not user: + raise UserNotFoundError(username) + + user.token_verified = True + + return {"Status": "SUCCESS"} + else: + raise NotAuthorizedError(access_token) + + def set_user_mfa_preference( + self, access_token, software_token_mfa_settings, sms_mfa_settings + ): + for user_pool in self.user_pools.values(): + if access_token in user_pool.access_tokens: + _, username = user_pool.access_tokens[access_token] + user = user_pool.users.get(username) + if not user: + raise UserNotFoundError(username) + + if software_token_mfa_settings["Enabled"]: + if user.token_verified: + user.software_token_mfa_enabled = True + else: + raise InvalidParameterException( + "User has not verified software token mfa" + ) + + elif sms_mfa_settings["Enabled"]: + user.sms_mfa_enabled = True + + return None + else: + raise NotAuthorizedError(access_token) + cognitoidp_backends = {} for region in Session().get_available_regions("cognito-idp"): diff --git a/moto/cognitoidp/responses.py b/moto/cognitoidp/responses.py index 972ba883a..f3c005ff5 100644 --- a/moto/cognitoidp/responses.py +++ b/moto/cognitoidp/responses.py @@ -4,7 +4,7 @@ import json import os from moto.core.responses import BaseResponse -from .models import cognitoidp_backends, find_region_by_value +from .models import cognitoidp_backends, find_region_by_value, UserStatus class CognitoIdpResponse(BaseResponse): @@ -390,6 +390,65 @@ class CognitoIdpResponse(BaseResponse): ) return json.dumps({"ResourceServer": resource_server.to_json()}) + def sign_up(self): + client_id = self._get_param("ClientId") + username = self._get_param("Username") + password = self._get_param("Password") + user = cognitoidp_backends[self.region].sign_up( + client_id=client_id, + username=username, + password=password, + attributes=self._get_param("UserAttributes", []), + ) + return json.dumps( + { + "UserConfirmed": user.status == UserStatus["CONFIRMED"], + "UserSub": user.id, + } + ) + + def confirm_sign_up(self): + client_id = self._get_param("ClientId") + username = self._get_param("Username") + confirmation_code = self._get_param("ConfirmationCode") + cognitoidp_backends[self.region].confirm_sign_up( + client_id=client_id, username=username, confirmation_code=confirmation_code, + ) + return "" + + def initiate_auth(self): + client_id = self._get_param("ClientId") + auth_flow = self._get_param("AuthFlow") + auth_parameters = self._get_param("AuthParameters") + + auth_result = cognitoidp_backends[self.region].initiate_auth( + client_id, auth_flow, auth_parameters + ) + + return json.dumps(auth_result) + + def associate_software_token(self): + access_token = self._get_param("AccessToken") + result = cognitoidp_backends[self.region].associate_software_token(access_token) + return json.dumps(result) + + def verify_software_token(self): + access_token = self._get_param("AccessToken") + user_code = self._get_param("UserCode") + result = cognitoidp_backends[self.region].verify_software_token( + access_token, user_code + ) + return json.dumps(result) + + def set_user_mfa_preference(self): + access_token = self._get_param("AccessToken") + software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings") + sms_mfa_settings = self._get_param("SMSMfaSettings") + cognitoidp_backends[self.region].set_user_mfa_preference( + access_token, software_token_mfa_settings, sms_mfa_settings + ) + return "" + class CognitoIdpJsonWebKeyResponse(BaseResponse): def __init__(self): diff --git a/moto/cognitoidp/utils.py b/moto/cognitoidp/utils.py index 5f5fe4f8f..11f34bcae 100644 --- a/moto/cognitoidp/utils.py +++ b/moto/cognitoidp/utils.py @@ -2,9 +2,20 @@ from __future__ import unicode_literals import six import random import string +import hashlib +import hmac +import base64 def create_id(): size = 26 chars = list(range(10)) + list(string.ascii_lowercase) return "".join(six.text_type(random.choice(chars)) for x in range(size)) + + +def check_secret_hash(app_client_secret, app_client_id, username, secret_hash): + key = bytes(str(app_client_secret).encode("latin-1")) + msg = bytes(str(username + app_client_id).encode("latin-1")) + new_digest = hmac.new(key, msg, hashlib.sha256).digest() + SECRET_HASH = base64.b64encode(new_digest).decode() + return SECRET_HASH == secret_hash diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 39875aeb4..65c5151e3 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -4,6 +4,9 @@ import json import os import random import re +import hmac +import hashlib +import base64 import requests import uuid @@ -1248,6 +1251,137 @@ def test_authentication_flow(): authentication_flow(conn) +def user_authentication_flow(conn): + username = str(uuid.uuid4()) + password = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + user_attribute_name = str(uuid.uuid4()) + user_attribute_value = str(uuid.uuid4()) + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, + ClientName=str(uuid.uuid4()), + ReadAttributes=[user_attribute_name], + GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + + conn.sign_up( + ClientId=client_id, Username=username, Password=password, + ) + + client_secret = conn.describe_user_pool_client( + UserPoolId=user_pool_id, ClientId=client_id, + )["UserPoolClient"]["ClientSecret"] + + conn.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456", + ) + + # generating secret hash + key = bytes(str(client_secret).encode("latin-1")) + msg = bytes(str(username + client_id).encode("latin-1")) + new_digest = hmac.new(key, msg, hashlib.sha256).digest() + secret_hash = base64.b64encode(new_digest).decode() + + result = conn.initiate_auth( + ClientId=client_id, + AuthFlow="USER_SRP_AUTH", + AuthParameters={ + "USERNAME": username, + "SRP_A": str(uuid.uuid4()), + "SECRET_HASH": secret_hash, + }, + ) + + result = conn.respond_to_auth_challenge( + ClientId=client_id, + ChallengeName=result["ChallengeName"], + ChallengeResponses={ + "PASSWORD_CLAIM_SIGNATURE": str(uuid.uuid4()), + "PASSWORD_CLAIM_SECRET_BLOCK": result["Session"], + "TIMESTAMP": str(uuid.uuid4()), + "USERNAME": username, + }, + ) + + refresh_token = result["AuthenticationResult"]["RefreshToken"] + + # add mfa token + conn.associate_software_token( + AccessToken=result["AuthenticationResult"]["AccessToken"], + ) + + conn.verify_software_token( + AccessToken=result["AuthenticationResult"]["AccessToken"], UserCode="123456", + ) + + conn.set_user_mfa_preference( + AccessToken=result["AuthenticationResult"]["AccessToken"], + SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True,}, + ) + + result = conn.initiate_auth( + ClientId=client_id, + AuthFlow="REFRESH_TOKEN", + AuthParameters={"SECRET_HASH": secret_hash, "REFRESH_TOKEN": refresh_token,}, + ) + + result["AuthenticationResult"]["IdToken"].should_not.be.none + result["AuthenticationResult"]["AccessToken"].should_not.be.none + + # authenticate user once again this time with mfa token + result = conn.initiate_auth( + ClientId=client_id, + AuthFlow="USER_SRP_AUTH", + AuthParameters={ + "USERNAME": username, + "SRP_A": str(uuid.uuid4()), + "SECRET_HASH": secret_hash, + }, + ) + + result = conn.respond_to_auth_challenge( + ClientId=client_id, + ChallengeName=result["ChallengeName"], + ChallengeResponses={ + "PASSWORD_CLAIM_SIGNATURE": str(uuid.uuid4()), + "PASSWORD_CLAIM_SECRET_BLOCK": result["Session"], + "TIMESTAMP": str(uuid.uuid4()), + "USERNAME": username, + }, + ) + + result = conn.respond_to_auth_challenge( + ClientId=client_id, + Session=result["Session"], + ChallengeName=result["ChallengeName"], + ChallengeResponses={ + "SOFTWARE_TOKEN_MFA_CODE": "123456", + "USERNAME": username, + "SECRET_HASH": secret_hash, + }, + ) + + return { + "user_pool_id": user_pool_id, + "client_id": client_id, + "client_secret": client_secret, + "secret_hash": secret_hash, + "id_token": result["AuthenticationResult"]["IdToken"], + "access_token": result["AuthenticationResult"]["AccessToken"], + "refresh_token": refresh_token, + "username": username, + "password": password, + "additional_fields": {user_attribute_name: user_attribute_value}, + } + + +@mock_cognitoidp +def test_user_authentication_flow(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_authentication_flow(conn) + + @mock_cognitoidp def test_token_legitimacy(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1437,6 +1571,244 @@ def test_resource_server(): ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) +@mock_cognitoidp +def test_sign_up(): + conn = boto3.client("cognito-idp", "us-west-2") + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), + )["UserPoolClient"]["ClientId"] + username = str(uuid.uuid4()) + password = str(uuid.uuid4()) + result = conn.sign_up(ClientId=client_id, Username=username, Password=password) + result["UserConfirmed"].should.be.false + result["UserSub"].should_not.be.none + + +@mock_cognitoidp +def test_confirm_sign_up(): + conn = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + password = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + conn.sign_up(ClientId=client_id, Username=username, Password=password) + + conn.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456", + ) + + result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) + result["UserStatus"].should.equal("CONFIRMED") + + +@mock_cognitoidp +def test_initiate_auth_USER_SRP_AUTH(): + conn = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + password = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + conn.sign_up(ClientId=client_id, Username=username, Password=password) + client_secret = conn.describe_user_pool_client( + UserPoolId=user_pool_id, ClientId=client_id, + )["UserPoolClient"]["ClientSecret"] + conn.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456", + ) + + key = bytes(str(client_secret).encode("latin-1")) + msg = bytes(str(username + client_id).encode("latin-1")) + new_digest = hmac.new(key, msg, hashlib.sha256).digest() + secret_hash = base64.b64encode(new_digest).decode() + + result = conn.initiate_auth( + ClientId=client_id, + AuthFlow="USER_SRP_AUTH", + AuthParameters={ + "USERNAME": username, + "SRP_A": str(uuid.uuid4()), + "SECRET_HASH": secret_hash, + }, + ) + + result["ChallengeName"].should.equal("PASSWORD_VERIFIER") + + +@mock_cognitoidp +def test_initiate_auth_REFRESH_TOKEN(): + conn = boto3.client("cognito-idp", "us-west-2") + result = user_authentication_flow(conn) + result = conn.initiate_auth( + ClientId=result["client_id"], + AuthFlow="REFRESH_TOKEN", + AuthParameters={ + "REFRESH_TOKEN": result["refresh_token"], + "SECRET_HASH": result["secret_hash"], + }, + ) + + result["AuthenticationResult"]["AccessToken"].should_not.be.none + + +@mock_cognitoidp +def test_initiate_auth_for_unconfirmed_user(): + conn = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + password = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + conn.sign_up(ClientId=client_id, Username=username, Password=password) + client_secret = conn.describe_user_pool_client( + UserPoolId=user_pool_id, ClientId=client_id, + )["UserPoolClient"]["ClientSecret"] + + key = bytes(str(client_secret).encode("latin-1")) + msg = bytes(str(username + client_id).encode("latin-1")) + new_digest = hmac.new(key, msg, hashlib.sha256).digest() + secret_hash = base64.b64encode(new_digest).decode() + + caught = False + try: + result = conn.initiate_auth( + ClientId=client_id, + AuthFlow="USER_SRP_AUTH", + AuthParameters={ + "USERNAME": username, + "SRP_A": str(uuid.uuid4()), + "SECRET_HASH": secret_hash, + }, + ) + except conn.exceptions.UserNotConfirmedException: + caught = True + + caught.should.be.true + + +@mock_cognitoidp +def test_initiate_auth_with_invalid_secret_hash(): + conn = boto3.client("cognito-idp", "us-west-2") + username = str(uuid.uuid4()) + password = str(uuid.uuid4()) + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + client_id = conn.create_user_pool_client( + UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True, + )["UserPoolClient"]["ClientId"] + conn.sign_up(ClientId=client_id, Username=username, Password=password) + client_secret = conn.describe_user_pool_client( + UserPoolId=user_pool_id, ClientId=client_id, + )["UserPoolClient"]["ClientSecret"] + conn.confirm_sign_up( + ClientId=client_id, Username=username, ConfirmationCode="123456", + ) + + invalid_secret_hash = str(uuid.uuid4()) + + caught = False + try: + result = conn.initiate_auth( + ClientId=client_id, + AuthFlow="USER_SRP_AUTH", + AuthParameters={ + "USERNAME": username, + "SRP_A": str(uuid.uuid4()), + "SECRET_HASH": invalid_secret_hash, + }, + ) + except conn.exceptions.NotAuthorizedException: + caught = True + + caught.should.be.true + + +@mock_cognitoidp +def test_setting_mfa(): + conn = boto3.client("cognito-idp", "us-west-2") + result = authentication_flow(conn) + conn.associate_software_token(AccessToken=result["access_token"]) + conn.verify_software_token(AccessToken=result["access_token"], UserCode="123456") + conn.set_user_mfa_preference( + AccessToken=result["access_token"], + SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, + ) + result = conn.admin_get_user( + UserPoolId=result["user_pool_id"], Username=result["username"] + ) + + result["UserMFASettingList"].should.have.length_of(1) + + +@mock_cognitoidp +def test_setting_mfa_when_token_not_verified(): + conn = boto3.client("cognito-idp", "us-west-2") + result = authentication_flow(conn) + conn.associate_software_token(AccessToken=result["access_token"]) + + caught = False + try: + conn.set_user_mfa_preference( + AccessToken=result["access_token"], + SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, + ) + except conn.exceptions.InvalidParameterException: + caught = True + + caught.should.be.true + + +@mock_cognitoidp +def test_respond_to_auth_challenge_with_invalid_secret_hash(): + conn = boto3.client("cognito-idp", "us-west-2") + result = user_authentication_flow(conn) + + valid_secret_hash = result["secret_hash"] + invalid_secret_hash = str(uuid.uuid4()) + + challenge = conn.initiate_auth( + ClientId=result["client_id"], + AuthFlow="USER_SRP_AUTH", + AuthParameters={ + "USERNAME": result["username"], + "SRP_A": str(uuid.uuid4()), + "SECRET_HASH": valid_secret_hash, + }, + ) + + challenge = conn.respond_to_auth_challenge( + ClientId=result["client_id"], + ChallengeName=challenge["ChallengeName"], + ChallengeResponses={ + "PASSWORD_CLAIM_SIGNATURE": str(uuid.uuid4()), + "PASSWORD_CLAIM_SECRET_BLOCK": challenge["Session"], + "TIMESTAMP": str(uuid.uuid4()), + "USERNAME": result["username"], + }, + ) + + caught = False + try: + conn.respond_to_auth_challenge( + ClientId=result["client_id"], + Session=challenge["Session"], + ChallengeName=challenge["ChallengeName"], + ChallengeResponses={ + "SOFTWARE_TOKEN_MFA_CODE": "123456", + "USERNAME": result["username"], + "SECRET_HASH": invalid_secret_hash, + }, + ) + except conn.exceptions.NotAuthorizedException: + caught = True + + caught.should.be.true + + # Test will retrieve public key from cognito.amazonaws.com/.well-known/jwks.json, # which isnt mocked in ServerMode if not settings.TEST_SERVER_MODE: