Implement User Pool MFA Actions (#3903)
* implement user pool mfa actions * Add messages to errors Add messages to errors Fix error message * Change exception type * fix validation & add more tests Co-authored-by: George Lewis <glewis@evertz.com>
This commit is contained in:
parent
29ecd32752
commit
f76571199f
@ -83,6 +83,10 @@ class CognitoIdpUserPool(BaseModel):
|
|||||||
self.creation_date = datetime.datetime.utcnow()
|
self.creation_date = datetime.datetime.utcnow()
|
||||||
self.last_modified_date = datetime.datetime.utcnow()
|
self.last_modified_date = datetime.datetime.utcnow()
|
||||||
|
|
||||||
|
self.mfa_config = "OFF"
|
||||||
|
self.sms_mfa_config = None
|
||||||
|
self.token_mfa_config = None
|
||||||
|
|
||||||
self.clients = OrderedDict()
|
self.clients = OrderedDict()
|
||||||
self.identity_providers = OrderedDict()
|
self.identity_providers = OrderedDict()
|
||||||
self.groups = OrderedDict()
|
self.groups = OrderedDict()
|
||||||
@ -105,6 +109,7 @@ class CognitoIdpUserPool(BaseModel):
|
|||||||
"Status": self.status,
|
"Status": self.status,
|
||||||
"CreationDate": time.mktime(self.creation_date.timetuple()),
|
"CreationDate": time.mktime(self.creation_date.timetuple()),
|
||||||
"LastModifiedDate": time.mktime(self.last_modified_date.timetuple()),
|
"LastModifiedDate": time.mktime(self.last_modified_date.timetuple()),
|
||||||
|
"MfaConfiguration": self.mfa_config,
|
||||||
}
|
}
|
||||||
|
|
||||||
def to_json(self, extended=False):
|
def to_json(self, extended=False):
|
||||||
@ -391,6 +396,25 @@ class CognitoIdpBackend(BaseBackend):
|
|||||||
self.user_pools[user_pool.id] = user_pool
|
self.user_pools[user_pool.id] = user_pool
|
||||||
return user_pool
|
return user_pool
|
||||||
|
|
||||||
|
def set_user_pool_mfa_config(
|
||||||
|
self, user_pool_id, sms_config, token_config, mfa_config
|
||||||
|
):
|
||||||
|
user_pool = self.describe_user_pool(user_pool_id)
|
||||||
|
user_pool.mfa_config = mfa_config
|
||||||
|
user_pool.sms_mfa_config = sms_config
|
||||||
|
user_pool.token_mfa_config = token_config
|
||||||
|
|
||||||
|
return self.get_user_pool_mfa_config(user_pool_id)
|
||||||
|
|
||||||
|
def get_user_pool_mfa_config(self, user_pool_id):
|
||||||
|
user_pool = self.describe_user_pool(user_pool_id)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"SmsMfaConfiguration": user_pool.sms_mfa_config,
|
||||||
|
"SoftwareTokenMfaConfiguration": user_pool.token_mfa_config,
|
||||||
|
"MfaConfiguration": user_pool.mfa_config,
|
||||||
|
}
|
||||||
|
|
||||||
@paginate(60)
|
@paginate(60)
|
||||||
def list_user_pools(self, max_results=None, next_token=None):
|
def list_user_pools(self, max_results=None, next_token=None):
|
||||||
return self.user_pools.values()
|
return self.user_pools.values()
|
||||||
|
@ -5,6 +5,7 @@ import os
|
|||||||
|
|
||||||
from moto.core.responses import BaseResponse
|
from moto.core.responses import BaseResponse
|
||||||
from .models import cognitoidp_backends, find_region_by_value, UserStatus
|
from .models import cognitoidp_backends, find_region_by_value, UserStatus
|
||||||
|
from .exceptions import InvalidParameterException
|
||||||
|
|
||||||
|
|
||||||
class CognitoIdpResponse(BaseResponse):
|
class CognitoIdpResponse(BaseResponse):
|
||||||
@ -20,6 +21,40 @@ class CognitoIdpResponse(BaseResponse):
|
|||||||
)
|
)
|
||||||
return json.dumps({"UserPool": user_pool.to_json(extended=True)})
|
return json.dumps({"UserPool": user_pool.to_json(extended=True)})
|
||||||
|
|
||||||
|
def set_user_pool_mfa_config(self):
|
||||||
|
user_pool_id = self._get_param("UserPoolId")
|
||||||
|
sms_config = self._get_param("SmsMfaConfiguration", None)
|
||||||
|
token_config = self._get_param("SoftwareTokenMfaConfiguration", None)
|
||||||
|
mfa_config = self._get_param("MfaConfiguration")
|
||||||
|
|
||||||
|
if mfa_config not in ["ON", "OFF", "OPTIONAL"]:
|
||||||
|
raise InvalidParameterException(
|
||||||
|
"[MfaConfiguration] must be one of 'ON', 'OFF', or 'OPTIONAL'."
|
||||||
|
)
|
||||||
|
|
||||||
|
if mfa_config in ["ON", "OPTIONAL"]:
|
||||||
|
if sms_config is None and token_config is None:
|
||||||
|
raise InvalidParameterException(
|
||||||
|
"At least one of [SmsMfaConfiguration] or [SoftwareTokenMfaConfiguration] must be provided."
|
||||||
|
)
|
||||||
|
if sms_config is not None:
|
||||||
|
if "SmsConfiguration" not in sms_config:
|
||||||
|
raise InvalidParameterException(
|
||||||
|
"[SmsConfiguration] is a required member of [SoftwareTokenMfaConfiguration]."
|
||||||
|
)
|
||||||
|
|
||||||
|
response = cognitoidp_backends[self.region].set_user_pool_mfa_config(
|
||||||
|
user_pool_id, sms_config, token_config, mfa_config
|
||||||
|
)
|
||||||
|
return json.dumps(response)
|
||||||
|
|
||||||
|
def get_user_pool_mfa_config(self):
|
||||||
|
user_pool_id = self._get_param("UserPoolId")
|
||||||
|
response = cognitoidp_backends[self.region].get_user_pool_mfa_config(
|
||||||
|
user_pool_id
|
||||||
|
)
|
||||||
|
return json.dumps(response)
|
||||||
|
|
||||||
def list_user_pools(self):
|
def list_user_pools(self):
|
||||||
max_results = self._get_param("MaxResults")
|
max_results = self._get_param("MaxResults")
|
||||||
next_token = self._get_param("NextToken", "0")
|
next_token = self._get_param("NextToken", "0")
|
||||||
|
@ -15,7 +15,7 @@ import boto3
|
|||||||
|
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
import sure # noqa
|
import sure # noqa
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError, ParamValidationError
|
||||||
from jose import jws, jwk, jwt
|
from jose import jws, jwk, jwt
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@ -54,6 +54,102 @@ def test_list_user_pools():
|
|||||||
result["UserPools"][0]["Name"].should.equal(name)
|
result["UserPools"][0]["Name"].should.equal(name)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_set_user_pool_mfa_config():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
|
||||||
|
name = str(uuid.uuid4())
|
||||||
|
user_pool_id = conn.create_user_pool(PoolName=name)["UserPool"]["Id"]
|
||||||
|
|
||||||
|
# Test error for when neither token nor sms configuration is provided
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.set_user_pool_mfa_config(
|
||||||
|
UserPoolId=user_pool_id, MfaConfiguration="ON",
|
||||||
|
)
|
||||||
|
|
||||||
|
ex.value.operation_name.should.equal("SetUserPoolMfaConfig")
|
||||||
|
ex.value.response["Error"]["Code"].should.equal("InvalidParameterException")
|
||||||
|
ex.value.response["Error"]["Message"].should.equal(
|
||||||
|
"At least one of [SmsMfaConfiguration] or [SoftwareTokenMfaConfiguration] must be provided."
|
||||||
|
)
|
||||||
|
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||||
|
|
||||||
|
# Test error for when sms config is missing `SmsConfiguration`
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.set_user_pool_mfa_config(
|
||||||
|
UserPoolId=user_pool_id, SmsMfaConfiguration={}, MfaConfiguration="ON",
|
||||||
|
)
|
||||||
|
|
||||||
|
ex.value.response["Error"]["Code"].should.equal("InvalidParameterException")
|
||||||
|
ex.value.response["Error"]["Message"].should.equal(
|
||||||
|
"[SmsConfiguration] is a required member of [SoftwareTokenMfaConfiguration]."
|
||||||
|
)
|
||||||
|
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||||
|
|
||||||
|
# Test error for when `SmsConfiguration` is missing `SnsCaller`
|
||||||
|
# This is asserted by boto3
|
||||||
|
with pytest.raises(ParamValidationError) as ex:
|
||||||
|
conn.set_user_pool_mfa_config(
|
||||||
|
UserPoolId=user_pool_id,
|
||||||
|
SmsMfaConfiguration={"SmsConfiguration": {}},
|
||||||
|
MfaConfiguration="ON",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test error for when `MfaConfiguration` is not one of the expected values
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.set_user_pool_mfa_config(
|
||||||
|
UserPoolId=user_pool_id,
|
||||||
|
SoftwareTokenMfaConfiguration={"Enabled": True},
|
||||||
|
MfaConfiguration="Invalid",
|
||||||
|
)
|
||||||
|
|
||||||
|
ex.value.response["Error"]["Code"].should.equal("InvalidParameterException")
|
||||||
|
ex.value.response["Error"]["Message"].should.equal(
|
||||||
|
"[MfaConfiguration] must be one of 'ON', 'OFF', or 'OPTIONAL'."
|
||||||
|
)
|
||||||
|
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||||
|
|
||||||
|
# Enable software token MFA
|
||||||
|
mfa_config = conn.set_user_pool_mfa_config(
|
||||||
|
UserPoolId=user_pool_id,
|
||||||
|
SoftwareTokenMfaConfiguration={"Enabled": True},
|
||||||
|
MfaConfiguration="ON",
|
||||||
|
)
|
||||||
|
|
||||||
|
mfa_config.shouldnt.have.key("SmsMfaConfiguration")
|
||||||
|
mfa_config["MfaConfiguration"].should.equal("ON")
|
||||||
|
mfa_config["SoftwareTokenMfaConfiguration"].should.equal({"Enabled": True})
|
||||||
|
|
||||||
|
# Response from describe should match
|
||||||
|
pool = conn.describe_user_pool(UserPoolId=user_pool_id)["UserPool"]
|
||||||
|
pool["MfaConfiguration"].should.equal("ON")
|
||||||
|
|
||||||
|
# Disable MFA
|
||||||
|
mfa_config = conn.set_user_pool_mfa_config(
|
||||||
|
UserPoolId=user_pool_id, MfaConfiguration="OFF",
|
||||||
|
)
|
||||||
|
|
||||||
|
mfa_config.shouldnt.have.key("SmsMfaConfiguration")
|
||||||
|
mfa_config.shouldnt.have.key("SoftwareTokenMfaConfiguration")
|
||||||
|
mfa_config["MfaConfiguration"].should.equal("OFF")
|
||||||
|
|
||||||
|
# Response from describe should match
|
||||||
|
pool = conn.describe_user_pool(UserPoolId=user_pool_id)["UserPool"]
|
||||||
|
pool["MfaConfiguration"].should.equal("OFF")
|
||||||
|
|
||||||
|
# `SnsCallerArn` needs to be at least 20 long
|
||||||
|
sms_config = {"SmsConfiguration": {"SnsCallerArn": "01234567890123456789"}}
|
||||||
|
|
||||||
|
# Enable SMS MFA
|
||||||
|
mfa_config = conn.set_user_pool_mfa_config(
|
||||||
|
UserPoolId=user_pool_id, SmsMfaConfiguration=sms_config, MfaConfiguration="ON",
|
||||||
|
)
|
||||||
|
|
||||||
|
mfa_config.shouldnt.have.key("SoftwareTokenMfaConfiguration")
|
||||||
|
mfa_config["SmsMfaConfiguration"].should.equal(sms_config)
|
||||||
|
mfa_config["MfaConfiguration"].should.equal("ON")
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
def test_list_user_pools_returns_max_items():
|
def test_list_user_pools_returns_max_items():
|
||||||
conn = boto3.client("cognito-idp", "us-west-2")
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user