CognitoIDP - improve default UserPool response (#5074)
This commit is contained in:
parent
9ab77d505d
commit
8ad121282f
@ -215,7 +215,7 @@ class CognitoIdpUserPoolAttribute(BaseModel):
|
||||
"Required custom attributes are not supported currently."
|
||||
)
|
||||
self.required = False
|
||||
self._init_constraints(schema, None)
|
||||
self._init_constraints(schema, None, show_empty_constraints=True)
|
||||
|
||||
def _init_standard(self, schema):
|
||||
attribute_data_type = schema.get("AttributeDataType", None)
|
||||
@ -252,7 +252,9 @@ class CognitoIdpUserPoolAttribute(BaseModel):
|
||||
)
|
||||
self._init_constraints(schema, default_constraints)
|
||||
|
||||
def _init_constraints(self, schema, default_constraints):
|
||||
def _init_constraints(
|
||||
self, schema, default_constraints, show_empty_constraints=False
|
||||
):
|
||||
def numeric_limit(num, constraint_type):
|
||||
if not num:
|
||||
return
|
||||
@ -267,7 +269,7 @@ class CognitoIdpUserPoolAttribute(BaseModel):
|
||||
)
|
||||
return parsed
|
||||
|
||||
self.string_constraints = None
|
||||
self.string_constraints = {} if show_empty_constraints else None
|
||||
self.number_constraints = None
|
||||
|
||||
if "AttributeDataType" in schema:
|
||||
@ -295,6 +297,7 @@ class CognitoIdpUserPoolAttribute(BaseModel):
|
||||
f"user.{self.name}: Max length cannot be less than min length."
|
||||
)
|
||||
self.string_constraints = string_constraints
|
||||
self.number_constraints = None
|
||||
elif self.data_type == "Number":
|
||||
number_constraints = schema.get(
|
||||
"NumberAttributeConstraints", default_constraints
|
||||
@ -315,6 +318,10 @@ class CognitoIdpUserPoolAttribute(BaseModel):
|
||||
f"user.{self.name}: Max value cannot be less than min value."
|
||||
)
|
||||
self.number_constraints = number_constraints
|
||||
self.string_constraints = None
|
||||
else:
|
||||
self.number_constraints = None
|
||||
self.string_constraints = None
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
@ -328,6 +335,36 @@ class CognitoIdpUserPoolAttribute(BaseModel):
|
||||
}
|
||||
|
||||
|
||||
DEFAULT_USER_POOL_CONFIG = {
|
||||
"Policies": {
|
||||
"PasswordPolicy": {
|
||||
"MinimumLength": 8,
|
||||
"RequireUppercase": True,
|
||||
"RequireLowercase": True,
|
||||
"RequireNumbers": True,
|
||||
"RequireSymbols": True,
|
||||
"TemporaryPasswordValidityDays": 7,
|
||||
}
|
||||
},
|
||||
"AdminCreateUserConfig": {
|
||||
"AllowAdminCreateUserOnly": False,
|
||||
"UnusedAccountValidityDays": 7,
|
||||
"InviteMessageTemplate": {
|
||||
"SMSMessage": "Your username is {username} and temporary password is {####}. ",
|
||||
"EmailMessage": "Your username is {username} and temporary password is {####}. ",
|
||||
"EmailSubject": "Your temporary password",
|
||||
},
|
||||
},
|
||||
"EmailConfiguration": {"EmailSendingAccount": "COGNITO_DEFAULT"},
|
||||
"VerificationMessageTemplate": {
|
||||
"SmsMessage": "Your verification code is {####}. ",
|
||||
"EmailMessage": "Your verification code is {####}. ",
|
||||
"EmailSubject": "Your verification code",
|
||||
"DefaultEmailOption": "CONFIRM_WITH_CODE",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class CognitoIdpUserPool(BaseModel):
|
||||
def __init__(self, region, name, extended_config):
|
||||
self.region = region
|
||||
@ -337,7 +374,24 @@ class CognitoIdpUserPool(BaseModel):
|
||||
)
|
||||
self.name = name
|
||||
self.status = None
|
||||
self.extended_config = extended_config or {}
|
||||
|
||||
self.extended_config = DEFAULT_USER_POOL_CONFIG.copy()
|
||||
self.extended_config.update(extended_config or {})
|
||||
|
||||
message_template = self.extended_config.get("VerificationMessageTemplate")
|
||||
if message_template and "SmsVerificationMessage" not in extended_config:
|
||||
self.extended_config["SmsVerificationMessage"] = message_template.get(
|
||||
"SmsMessage"
|
||||
)
|
||||
if message_template and "EmailVerificationSubject" not in extended_config:
|
||||
self.extended_config["EmailVerificationSubject"] = message_template.get(
|
||||
"EmailSubject"
|
||||
)
|
||||
if message_template and "EmailVerificationMessage" not in extended_config:
|
||||
self.extended_config["EmailVerificationMessage"] = message_template.get(
|
||||
"EmailMessage"
|
||||
)
|
||||
|
||||
self.creation_date = datetime.datetime.utcnow()
|
||||
self.last_modified_date = datetime.datetime.utcnow()
|
||||
|
||||
@ -346,18 +400,19 @@ class CognitoIdpUserPool(BaseModel):
|
||||
self.token_mfa_config = None
|
||||
|
||||
self.schema_attributes = {}
|
||||
for schema in extended_config.pop("Schema", {}):
|
||||
for schema in self.extended_config.pop("Schema", {}):
|
||||
attribute = CognitoIdpUserPoolAttribute(
|
||||
schema["Name"],
|
||||
schema["Name"] not in CognitoIdpUserPoolAttribute.STANDARD_SCHEMA,
|
||||
schema,
|
||||
)
|
||||
self.schema_attributes[attribute.name] = attribute
|
||||
for (
|
||||
standard_attribute_name,
|
||||
standard_attribute_schema,
|
||||
) in CognitoIdpUserPoolAttribute.STANDARD_SCHEMA.items():
|
||||
if standard_attribute_name not in self.schema_attributes:
|
||||
# If we do not have custom attributes, use the standard schema
|
||||
if not self.schema_attributes:
|
||||
for (
|
||||
standard_attribute_name,
|
||||
standard_attribute_schema,
|
||||
) in CognitoIdpUserPoolAttribute.STANDARD_SCHEMA.items():
|
||||
self.schema_attributes[
|
||||
standard_attribute_name
|
||||
] = CognitoIdpUserPoolAttribute(
|
||||
@ -591,6 +646,9 @@ class CognitoIdpIdentityProvider(BaseModel):
|
||||
self.creation_date = datetime.datetime.utcnow()
|
||||
self.last_modified_date = datetime.datetime.utcnow()
|
||||
|
||||
if "AttributeMapping" not in self.extended_config:
|
||||
self.extended_config["AttributeMapping"] = {"username": "sub"}
|
||||
|
||||
def _base_json(self):
|
||||
return {
|
||||
"ProviderName": self.name,
|
||||
@ -1403,13 +1461,10 @@ class CognitoIdpBackend(BaseBackend):
|
||||
return resource_server
|
||||
|
||||
def sign_up(self, client_id, username, password, attributes):
|
||||
# This method may not be authenticated - which means we don't know which region the request was send to
|
||||
# Let's cycle through all regions to find out which one contains our client_id
|
||||
user_pool = None
|
||||
for backend in cognitoidp_backends.values():
|
||||
for p in backend.user_pools.values():
|
||||
if client_id in p.clients:
|
||||
user_pool = p
|
||||
for p in self.user_pools.values():
|
||||
if client_id in p.clients:
|
||||
user_pool = p
|
||||
if user_pool is None:
|
||||
raise ResourceNotFoundError(client_id)
|
||||
elif user_pool._get_user(username):
|
||||
@ -1698,7 +1753,37 @@ class CognitoIdpBackend(BaseBackend):
|
||||
user_pool.add_custom_attributes(custom_attributes)
|
||||
|
||||
|
||||
class GlobalCognitoIdpBackend(CognitoIdpBackend):
|
||||
"""
|
||||
Some operations are unauthenticated
|
||||
Without authentication-header, we lose the context of which region the request was send to
|
||||
This backend will cycle through all backends as a workaround
|
||||
"""
|
||||
|
||||
def _find_backend_for_clientid(self, client_id):
|
||||
for region, backend in cognitoidp_backends.items():
|
||||
if region == "global":
|
||||
continue
|
||||
for p in backend.user_pools.values():
|
||||
if client_id in p.clients:
|
||||
return backend
|
||||
return cognitoidp_backends["us-east-1"]
|
||||
|
||||
def sign_up(self, client_id, username, password, attributes):
|
||||
backend = self._find_backend_for_clientid(client_id)
|
||||
return backend.sign_up(client_id, username, password, attributes)
|
||||
|
||||
def initiate_auth(self, client_id, auth_flow, auth_parameters):
|
||||
backend = self._find_backend_for_clientid(client_id)
|
||||
return backend.initiate_auth(client_id, auth_flow, auth_parameters)
|
||||
|
||||
def confirm_sign_up(self, client_id, username):
|
||||
backend = self._find_backend_for_clientid(client_id)
|
||||
return backend.confirm_sign_up(client_id, username)
|
||||
|
||||
|
||||
cognitoidp_backends = BackendDict(CognitoIdpBackend, "cognito-idp")
|
||||
cognitoidp_backends["global"] = GlobalCognitoIdpBackend("global")
|
||||
|
||||
|
||||
# Hack to help moto-server process requests on localhost, where the region isn't
|
||||
|
@ -516,7 +516,7 @@ class CognitoIdpResponse(BaseResponse):
|
||||
client_id = self._get_param("ClientId")
|
||||
username = self._get_param("Username")
|
||||
password = self._get_param("Password")
|
||||
user = cognitoidp_backends[self.region].sign_up(
|
||||
user = cognitoidp_backends["global"].sign_up(
|
||||
client_id=client_id,
|
||||
username=username,
|
||||
password=password,
|
||||
@ -532,7 +532,7 @@ class CognitoIdpResponse(BaseResponse):
|
||||
def confirm_sign_up(self):
|
||||
client_id = self._get_param("ClientId")
|
||||
username = self._get_param("Username")
|
||||
cognitoidp_backends[self.region].confirm_sign_up(
|
||||
cognitoidp_backends["global"].confirm_sign_up(
|
||||
client_id=client_id, username=username
|
||||
)
|
||||
return ""
|
||||
@ -542,7 +542,7 @@ class CognitoIdpResponse(BaseResponse):
|
||||
auth_flow = self._get_param("AuthFlow")
|
||||
auth_parameters = self._get_param("AuthParameters")
|
||||
|
||||
auth_result = cognitoidp_backends[self.region].initiate_auth(
|
||||
auth_result = cognitoidp_backends["global"].initiate_auth(
|
||||
client_id, auth_flow, auth_parameters
|
||||
)
|
||||
|
||||
|
@ -22,6 +22,13 @@ cloudwatch:
|
||||
- TestAccCloudWatchLogsDestination
|
||||
- TestAccCloudWatchLogsDestinationPolicy
|
||||
- TestAccCloudWatchLogsGroupDataSource
|
||||
cognitoidp:
|
||||
- TestAccCognitoIDPIdentityProvider
|
||||
- TestAccCognitoIDPUserPool_
|
||||
- TestAccCognitoUser_
|
||||
- TestAccCognitoUserInGroup_
|
||||
- TestAccCognitoIDPUserPoolClients
|
||||
- TestAccCognitoIDPUserPoolClientDataSource
|
||||
dynamodb:
|
||||
- TestAccDynamoDBTableItem
|
||||
ec2:
|
||||
|
@ -42,6 +42,26 @@ def test_create_user_pool():
|
||||
result["UserPool"]["LambdaConfig"]["PreSignUp"].should.equal(value)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_create_user_pool__overwrite_template_messages():
|
||||
client = boto3.client("cognito-idp", "us-east-2")
|
||||
resp = client.create_user_pool(
|
||||
PoolName="test",
|
||||
VerificationMessageTemplate={
|
||||
"DefaultEmailOption": "CONFIRM_WITH_LINK",
|
||||
"EmailMessage": "foo {####} bar",
|
||||
"EmailMessageByLink": "{##foobar##}",
|
||||
"EmailSubject": "foobar {####}",
|
||||
"EmailSubjectByLink": "foobar",
|
||||
"SmsMessage": "{####} baz",
|
||||
},
|
||||
)
|
||||
pool = resp["UserPool"]
|
||||
pool.should.have.key("SmsVerificationMessage").equals("{####} baz")
|
||||
pool.should.have.key("EmailVerificationSubject").equals("foobar {####}")
|
||||
pool.should.have.key("EmailVerificationMessage").equals("foo {####} bar")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_create_user_pool_should_have_all_default_attributes_in_schema():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
@ -127,7 +147,6 @@ def test_create_user_pool_custom_attribute_defaults():
|
||||
)
|
||||
string_attribute["DeveloperOnlyAttribute"].should.equal(False)
|
||||
string_attribute["Mutable"].should.equal(True)
|
||||
string_attribute.shouldnt.have.key("StringAttributeConstraints")
|
||||
|
||||
number_attribute = next(
|
||||
attr
|
||||
@ -1200,12 +1219,14 @@ def test_update_identity_provider():
|
||||
UserPoolId=user_pool_id,
|
||||
ProviderName=provider_name,
|
||||
ProviderDetails={"thing": new_value},
|
||||
)
|
||||
AttributeMapping={"email": "email", "username": "sub"},
|
||||
)["IdentityProvider"]
|
||||
|
||||
result["IdentityProvider"]["UserPoolId"].should.equal(user_pool_id)
|
||||
result["IdentityProvider"]["ProviderName"].should.equal(provider_name)
|
||||
result["IdentityProvider"]["ProviderType"].should.equal(provider_type)
|
||||
result["IdentityProvider"]["ProviderDetails"]["thing"].should.equal(new_value)
|
||||
result["UserPoolId"].should.equal(user_pool_id)
|
||||
result["ProviderName"].should.equal(provider_name)
|
||||
result["ProviderType"].should.equal(provider_type)
|
||||
result["ProviderDetails"]["thing"].should.equal(new_value)
|
||||
result["AttributeMapping"].should.equal({"email": "email", "username": "sub"})
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
|
Loading…
Reference in New Issue
Block a user