Add add_custom_attributes implementation (#4498)

This commit is contained in:
Łukasz 2021-10-30 11:58:29 +02:00 committed by GitHub
parent ecdd395bec
commit 56866e0ef5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 9 deletions

View File

@ -162,16 +162,16 @@ class CognitoIdpUserPoolAttribute(BaseModel):
},
}
ATTRIBUTE_DATE_TYPES = {"Boolean", "DateTime", "String", "Number"}
ATTRIBUTE_DATA_TYPES = {"Boolean", "DateTime", "String", "Number"}
def __init__(self, name, schema):
def __init__(self, name, custom, schema):
self.name = name
self.custom = self.name not in CognitoIdpUserPoolAttribute.STANDARD_SCHEMA
self.custom = custom
attribute_data_type = schema.get("AttributeDataType", None)
if (
attribute_data_type
and attribute_data_type
not in CognitoIdpUserPoolAttribute.ATTRIBUTE_DATE_TYPES
not in CognitoIdpUserPoolAttribute.ATTRIBUTE_DATA_TYPES
):
raise InvalidParameterException(
f"Validation error detected: Value '{attribute_data_type}' failed to satisfy constraint: Member must satisfy enum value set: [Boolean, Number, String, DateTime]"
@ -330,10 +330,14 @@ class CognitoIdpUserPool(BaseModel):
self.sms_mfa_config = None
self.token_mfa_config = None
self.schema_attributes = {
schema["Name"]: CognitoIdpUserPoolAttribute(schema["Name"], schema)
for schema in extended_config.pop("Schema", {})
}
self.schema_attributes = {}
for schema in extended_config.pop("Schema", {}):
attribute = CognitoIdpUserPoolAttribute(
schema["Name"],
schema["Name"] not in CognitoIdpUserPoolAttribute.STANDARD_SCHEMA,
schema,
)
self.schema_attributes[attribute.name] = attribute
for (
standard_attribute_name,
standard_attribute_schema,
@ -342,7 +346,7 @@ class CognitoIdpUserPool(BaseModel):
self.schema_attributes[
standard_attribute_name
] = CognitoIdpUserPoolAttribute(
standard_attribute_name, standard_attribute_schema
standard_attribute_name, False, standard_attribute_schema
)
self.clients = OrderedDict()
@ -439,6 +443,22 @@ class CognitoIdpUserPool(BaseModel):
expires_in,
)
def add_custom_attributes(self, custom_attributes):
attributes = []
for attribute_schema in custom_attributes:
base_name = attribute_schema["Name"]
target_name = "custom:" + base_name
if attribute_schema.get("DeveloperOnlyAttribute", False):
target_name = "dev:" + target_name
if target_name in self.schema_attributes:
raise InvalidParameterException(
f"custom:{base_name}: Existing attribute already has name {target_name}."
)
attribute = CognitoIdpUserPoolAttribute(base_name, True, attribute_schema)
attributes.append(attribute)
for attribute in attributes:
self.schema_attributes[attribute.name] = attribute
def create_id_token(self, client_id, username):
extra_data = self.get_user_extra_data_by_client_id(client_id, username)
id_token, expires_in = self.create_jwt(
@ -1553,6 +1573,10 @@ class CognitoIdpBackend(BaseBackend):
else:
user.status = UserStatus.FORCE_CHANGE_PASSWORD
def add_custom_attributes(self, user_pool_id, custom_attributes):
user_pool = self.describe_user_pool(user_pool_id)
user_pool.add_custom_attributes(custom_attributes)
cognitoidp_backends = {}
for region in Session().get_available_regions("cognito-idp"):

View File

@ -575,6 +575,14 @@ class CognitoIdpResponse(BaseResponse):
)
return ""
def add_custom_attributes(self):
user_pool_id = self._get_param("UserPoolId")
custom_attributes = self._get_param("CustomAttributes")
cognitoidp_backends[self.region].add_custom_attributes(
user_pool_id, custom_attributes
)
return ""
class CognitoIdpJsonWebKeyResponse(BaseResponse):
def __init__(self):

View File

@ -441,6 +441,53 @@ def test_create_user_pool_number_schema_min_bigger_than_max():
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_cognitoidp
def test_add_custom_attributes():
conn = boto3.client("cognito-idp", "us-west-2")
pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
custom_attribute = {"Name": "banana", "AttributeDataType": "String"}
res = conn.add_custom_attributes(
UserPoolId=pool_id, CustomAttributes=[custom_attribute]
)
res["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
res = conn.describe_user_pool(UserPoolId=pool_id)
described_attribute = next(
attr
for attr in res["UserPool"]["SchemaAttributes"]
if attr["Name"] == "custom:banana"
)
# Skip verification - already covered by create_user_pool with custom attributes
described_attribute.should_not.be.none
@mock_cognitoidp
def test_add_custom_attributes_existing_attribute():
conn = boto3.client("cognito-idp", "us-west-2")
custom_attribute = {
"Name": "banana",
"AttributeDataType": "String",
"DeveloperOnlyAttribute": True,
}
pool_id = conn.create_user_pool(
PoolName=str(uuid.uuid4()), Schema=[custom_attribute]
)["UserPool"]["Id"]
with pytest.raises(ClientError) as ex:
conn.add_custom_attributes(
UserPoolId=pool_id, CustomAttributes=[custom_attribute]
)
ex.value.response["Error"]["Code"].should.equal("InvalidParameterException")
ex.value.response["Error"]["Message"].should.equal(
f"custom:banana: Existing attribute already has name dev:custom:banana."
)
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@mock_cognitoidp
def test_list_user_pools():
conn = boto3.client("cognito-idp", "us-west-2")