Identitystore - Partial implementation (#6411)

This commit is contained in:
jeremyrobertson 2023-06-21 03:49:49 -06:00 committed by GitHub
parent b11cc6a5fe
commit 5de95f4d02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1029 additions and 9 deletions

View File

@ -1 +1,37 @@
"""Exceptions raised by the identitystore service.""" """Exceptions raised by the identitystore service."""
import json
from moto.core.exceptions import AWSError
from typing import Any
request_id = "178936da-50ad-4d58-8871-22d9979e8658example"
class IdentityStoreError(AWSError):
def __init__(self, **kwargs: Any):
super(AWSError, self).__init__(error_type=self.TYPE, message=kwargs["message"]) # type: ignore
self.description: str = json.dumps(
{
"__type": self.error_type,
"RequestId": request_id,
"Message": self.message,
"ResourceType": kwargs.get("resource_type"),
"Reason": kwargs.get("reason"),
}
)
class ResourceNotFoundException(IdentityStoreError):
TYPE = "ResourceNotFoundException"
code = 400
class ValidationException(IdentityStoreError):
TYPE = "ValidationException"
code = 400
class ConflictException(IdentityStoreError):
TYPE = "ConflictException"
code = 400

View File

@ -1,19 +1,98 @@
from typing import Dict, Tuple from typing import Dict, Tuple, List, Any, NamedTuple, Optional
from typing_extensions import Self
from moto.utilities.paginator import paginate
from botocore.exceptions import ParamValidationError
from moto.moto_api._internal import mock_random from moto.moto_api._internal import mock_random
from moto.core import BaseBackend, BackendDict from moto.core import BaseBackend, BackendDict
from .exceptions import (
ResourceNotFoundException,
ValidationException,
ConflictException,
)
import warnings
class Name(NamedTuple):
Formatted: Optional[str]
FamilyName: Optional[str]
GivenName: Optional[str]
MiddleName: Optional[str]
HonorificPrefix: Optional[str]
HonorificSuffix: Optional[str]
@classmethod
def from_dict(cls, name_dict: Dict[str, str]) -> Optional[Self]:
if not name_dict:
return None
return cls(
name_dict.get("Formatted"),
name_dict.get("FamilyName"),
name_dict.get("GivenName"),
name_dict.get("MiddleName"),
name_dict.get("HonorificPrefix"),
name_dict.get("HonorificSuffix"),
)
class User(NamedTuple):
UserId: str
IdentityStoreId: str
UserName: str
Name: Optional[Name]
DisplayName: str
NickName: str
ProfileUrl: str
Emails: List[Dict[str, str]]
Addresses: List[Dict[str, str]]
PhoneNumbers: List[Dict[str, str]]
UserType: str
Title: str
PreferredLanguage: str
Locale: str
Timezone: str
class IdentityStoreData:
def __init__(self) -> None:
self.groups: Dict[str, Dict[str, str]] = {}
self.users: Dict[str, User] = {}
self.group_memberships: Dict[str, Any] = {}
class IdentityStoreBackend(BaseBackend): class IdentityStoreBackend(BaseBackend):
"""Implementation of IdentityStore APIs.""" """Implementation of IdentityStore APIs."""
def __init__(self, region_name: str, account_id: str): PAGINATION_MODEL = {
"list_group_memberships": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "MembershipId",
},
}
def __init__(self, region_name: str, account_id: str) -> None:
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.groups: Dict[str, Dict[str, str]] = {} self.identity_stores: Dict[str, IdentityStoreData] = {}
def create_group( def create_group(
self, identity_store_id: str, display_name: str, description: str self, identity_store_id: str, display_name: str, description: str
) -> Tuple[str, str]: ) -> Tuple[str, str]:
identity_store = self.__get_identity_store(identity_store_id)
matching = [
g
for g in identity_store.groups.values()
if g["DisplayName"] == display_name
]
if len(matching) > 0:
raise ConflictException(
message="Duplicate GroupDisplayName",
reason="UNIQUENESS_CONSTRAINT_VIOLATION",
)
group_id = str(mock_random.uuid4()) group_id = str(mock_random.uuid4())
group_dict = { group_dict = {
"GroupId": group_id, "GroupId": group_id,
@ -21,8 +100,180 @@ class IdentityStoreBackend(BaseBackend):
"DisplayName": display_name, "DisplayName": display_name,
"Description": description, "Description": description,
} }
self.groups[group_id] = group_dict identity_store.groups[group_id] = group_dict
return group_id, identity_store_id return group_id, identity_store_id
def get_group_id(
self, identity_store_id: str, alternate_identifier: Dict[str, Any]
) -> Tuple[str, str]:
identity_store = self.__get_identity_store(identity_store_id)
if "UniqueAttribute" in alternate_identifier:
if (
"AttributeValue" in alternate_identifier["UniqueAttribute"]
and alternate_identifier["UniqueAttribute"]["AttributePath"].lower()
== "displayname"
):
for g in identity_store.groups.values():
if (
g["DisplayName"]
== alternate_identifier["UniqueAttribute"]["AttributeValue"]
):
return g["GroupId"], identity_store_id
elif "ExternalId" in alternate_identifier:
warnings.warn("ExternalId has not been implemented.")
raise ResourceNotFoundException(
message="GROUP not found.", resource_type="GROUP"
)
def delete_group(self, identity_store_id: str, group_id: str) -> None:
identity_store = self.__get_identity_store(identity_store_id)
if group_id in identity_store.groups:
del identity_store.groups[group_id]
def create_user(
self,
identity_store_id: str,
user_name: str,
name: Dict[str, str],
display_name: str,
nick_name: str,
profile_url: str,
emails: List[Dict[str, Any]],
addresses: List[Dict[str, Any]],
phone_numbers: List[Dict[str, Any]],
user_type: str,
title: str,
preferred_language: str,
locale: str,
timezone: str,
) -> Tuple[str, str]:
identity_store = self.__get_identity_store(identity_store_id)
user_id = str(mock_random.uuid4())
new_user = User(
user_id,
identity_store_id,
user_name,
Name.from_dict(name),
display_name,
nick_name,
profile_url,
emails,
addresses,
phone_numbers,
user_type,
title,
preferred_language,
locale,
timezone,
)
self.__validate_create_user(new_user, identity_store)
identity_store.users[user_id] = new_user
return user_id, identity_store_id
def describe_user(self, identity_store_id: str, user_id: str) -> User:
identity_store = self.__get_identity_store(identity_store_id)
if user_id in identity_store.users:
return identity_store.users[user_id]
raise ResourceNotFoundException(message="USER not found.", resource_type="USER")
def delete_user(self, identity_store_id: str, user_id: str) -> None:
identity_store = self.__get_identity_store(identity_store_id)
if user_id in identity_store.users:
del identity_store.users[user_id]
def create_group_membership(
self, identity_store_id: str, group_id: str, member_id: Dict[str, str]
) -> Tuple[str, str]:
identity_store = self.__get_identity_store(identity_store_id)
user_id = member_id["UserId"]
if user_id not in identity_store.users:
raise ResourceNotFoundException(
message="Member does not exist", resource_type="USER"
)
if group_id not in identity_store.groups:
raise ResourceNotFoundException(
message="Group does not exist", resource_type="GROUP"
)
membership_id = str(mock_random.uuid4())
identity_store.group_memberships[membership_id] = {
"IdentityStoreId": identity_store_id,
"MembershipId": membership_id,
"GroupId": group_id,
"MemberId": {"UserId": user_id},
}
return membership_id, identity_store_id
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
def list_group_memberships(
self,
identity_store_id: str,
group_id: str,
) -> List[Any]: # type: ignore
identity_store = self.__get_identity_store(identity_store_id)
return [
m
for m in identity_store.group_memberships.values()
if m["GroupId"] == group_id
]
def delete_group_membership(
self, identity_store_id: str, membership_id: str
) -> None:
identity_store = self.__get_identity_store(identity_store_id)
if membership_id in identity_store.group_memberships:
del identity_store.group_memberships[membership_id]
def __get_identity_store(self, store_id: str) -> IdentityStoreData:
if len(store_id) < 1:
raise ParamValidationError(
msg="Invalid length for parameter IdentityStoreId, value: 0, valid min length: 1"
)
if store_id not in self.identity_stores:
self.identity_stores[store_id] = IdentityStoreData()
return self.identity_stores[store_id]
def __validate_create_user(
self, new_user: User, identity_store: IdentityStoreData
) -> None:
if not new_user.UserName:
raise ValidationException(message="userName is a required attribute")
missing = []
if not new_user.DisplayName:
missing.append("displayname")
if not new_user.Name:
missing.append("name")
else:
if not new_user.Name.GivenName:
missing.append("givenname")
if not new_user.Name.FamilyName:
missing.append("familyname")
if len(missing) > 0:
message = ", ".join(
[f"{att}: The attribute {att} is required" for att in missing]
)
raise ValidationException(message=message)
matching = [
u for u in identity_store.users.values() if u.UserName == new_user.UserName
]
if len(matching) > 0:
raise ConflictException(
message="Duplicate UserName", reason="UNIQUENESS_CONSTRAINT_VIOLATION"
)
identitystore_backends = BackendDict(IdentityStoreBackend, "identitystore") identitystore_backends = BackendDict(IdentityStoreBackend, "identitystore")

View File

@ -1,5 +1,6 @@
"""Handles incoming identitystore requests, invokes methods, returns responses.""" """Handles incoming identitystore requests, invokes methods, returns responses."""
import json import json
from typing import NamedTuple, Any, Dict, Optional
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import identitystore_backends, IdentityStoreBackend from .models import identitystore_backends, IdentityStoreBackend
@ -26,3 +27,145 @@ class IdentityStoreResponse(BaseResponse):
description=description, description=description,
) )
return json.dumps(dict(GroupId=group_id, IdentityStoreId=identity_store_id)) return json.dumps(dict(GroupId=group_id, IdentityStoreId=identity_store_id))
def create_group_membership(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
group_id = self._get_param("GroupId")
member_id = self._get_param("MemberId")
(
membership_id,
identity_store_id,
) = self.identitystore_backend.create_group_membership(
identity_store_id=identity_store_id,
group_id=group_id,
member_id=member_id,
)
return json.dumps(
dict(MembershipId=membership_id, IdentityStoreId=identity_store_id)
)
def create_user(self) -> str:
user_id, identity_store_id = self.identitystore_backend.create_user(
self._get_param("IdentityStoreId"),
self._get_param("UserName"),
self._get_param("Name"),
self._get_param("DisplayName"),
self._get_param("NickName"),
self._get_param("ProfileUrl"),
self._get_param("Emails"),
self._get_param("Addresses"),
self._get_param("PhoneNumbers"),
self._get_param("UserType"),
self._get_param("Title"),
self._get_param("PreferredLanguage"),
self._get_param("Locale"),
self._get_param("Timezone"),
)
return json.dumps(dict(UserId=user_id, IdentityStoreId=identity_store_id))
def get_group_id(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
alternate_identifier = self._get_param("AlternateIdentifier")
group_id, identity_store_id = self.identitystore_backend.get_group_id(
identity_store_id=identity_store_id,
alternate_identifier=alternate_identifier,
)
return json.dumps(dict(GroupId=group_id, IdentityStoreId=identity_store_id))
def describe_user(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
user_id = self._get_param("UserId")
(
user_id,
identity_store_id,
user_name,
name,
display_name,
nick_name,
profile_url,
emails,
addresses,
phone_numbers,
user_type,
title,
preferred_language,
locale,
timezone,
) = self.identitystore_backend.describe_user(
identity_store_id=identity_store_id,
user_id=user_id,
)
return json.dumps(
dict(
UserName=user_name,
UserId=user_id,
ExternalIds=None,
Name=self.named_tuple_to_dict(name),
DisplayName=display_name,
NickName=nick_name,
ProfileUrl=profile_url,
Emails=emails,
Addresses=addresses,
PhoneNumbers=phone_numbers,
UserType=user_type,
Title=title,
PreferredLanguage=preferred_language,
Locale=locale,
Timezone=timezone,
IdentityStoreId=identity_store_id,
)
)
def list_group_memberships(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
group_id = self._get_param("GroupId")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
(
group_memberships,
next_token,
) = self.identitystore_backend.list_group_memberships(
identity_store_id=identity_store_id,
group_id=group_id,
max_results=max_results,
next_token=next_token,
)
return json.dumps(
dict(GroupMemberships=group_memberships, NextToken=next_token)
)
def delete_group(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
group_id = self._get_param("GroupId")
self.identitystore_backend.delete_group(
identity_store_id=identity_store_id,
group_id=group_id,
)
return json.dumps(dict())
def delete_group_membership(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
membership_id = self._get_param("MembershipId")
self.identitystore_backend.delete_group_membership(
identity_store_id=identity_store_id,
membership_id=membership_id,
)
return json.dumps(dict())
def delete_user(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
user_id = self._get_param("UserId")
self.identitystore_backend.delete_user(
identity_store_id=identity_store_id,
user_id=user_id,
)
return json.dumps(dict())
def named_tuple_to_dict(
self, value: Optional[NamedTuple]
) -> Optional[Dict[str, Any]]:
if value:
return value._asdict()
return None

View File

@ -1,6 +1,5 @@
import logging import logging
from . import helpers # noqa
# Disable extra logging for tests # Disable extra logging for tests
logging.getLogger("boto").setLevel(logging.CRITICAL) logging.getLogger("boto").setLevel(logging.CRITICAL)

View File

@ -22,6 +22,7 @@ from moto.cloudformation import cloudformation_backends
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests import EXAMPLE_AMI_ID from tests import EXAMPLE_AMI_ID
from tests.helpers import match_dict # noqa # pylint: disable=unused-import
dummy_template = { dummy_template = {
"AWSTemplateFormatVersion": "2010-09-09", "AWSTemplateFormatVersion": "2010-09-09",

View File

@ -29,6 +29,9 @@ from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests import EXAMPLE_AMI_ID, EXAMPLE_AMI_ID2 from tests import EXAMPLE_AMI_ID, EXAMPLE_AMI_ID2
from tests.markers import requires_docker from tests.markers import requires_docker
from tests.test_cloudformation.fixtures import fn_join, single_instance_with_ebs_volume from tests.test_cloudformation.fixtures import fn_join, single_instance_with_ebs_volume
from tests.helpers import ( # noqa # pylint: disable=unused-import
containing_item_with_attributes,
)
@mock_cloudformation @mock_cloudformation

View File

@ -1,20 +1,28 @@
"""Unit tests for identitystore-supported APIs.""" """Unit tests for identitystore-supported APIs."""
from uuid import UUID import random
import string
from uuid import UUID, uuid4
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import import pytest
from botocore.exceptions import ClientError
from moto import mock_identitystore from moto import mock_identitystore
from moto.moto_api._internal import mock_random
# See our Development Tips on writing tests for hints on how to write good tests: # See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html # http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
def get_identity_store_id() -> str:
return f"d-{random.choices(string.ascii_lowercase, k=10)}"
@mock_identitystore @mock_identitystore
def test_create_group(): def test_create_group():
client = boto3.client("identitystore", region_name="ap-southeast-1") client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = "d-9067028cf5" identity_store_id = get_identity_store_id()
create_resp = client.create_group( create_resp = client.create_group(
IdentityStoreId=identity_store_id, IdentityStoreId=identity_store_id,
DisplayName="test_group", DisplayName="test_group",
@ -22,3 +30,581 @@ def test_create_group():
) )
assert create_resp["IdentityStoreId"] == identity_store_id assert create_resp["IdentityStoreId"] == identity_store_id
assert UUID(create_resp["GroupId"]) assert UUID(create_resp["GroupId"])
@mock_identitystore
def test_create_group_duplicate_name():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
create_resp = client.create_group(
IdentityStoreId=identity_store_id,
DisplayName="test_group",
Description="description",
)
assert create_resp["IdentityStoreId"] == identity_store_id
assert UUID(create_resp["GroupId"])
with pytest.raises(ClientError) as exc:
client.create_group(
IdentityStoreId=identity_store_id,
DisplayName="test_group",
Description="description",
)
err = exc.value
assert "ConflictException" in str(type(err))
assert (
str(err)
== "An error occurred (ConflictException) when calling the CreateGroup operation: Duplicate GroupDisplayName"
)
assert err.operation_name == "CreateGroup"
assert err.response["Error"]["Code"] == "ConflictException"
assert err.response["Error"]["Message"] == "Duplicate GroupDisplayName"
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert err.response["Message"] == "Duplicate GroupDisplayName"
assert err.response["Reason"] == "UNIQUENESS_CONSTRAINT_VIOLATION"
@mock_identitystore
def test_group_multiple_identity_stores():
identity_store_id = get_identity_store_id()
identity_store_id2 = get_identity_store_id()
client = boto3.client("identitystore", region_name="us-east-2")
group1 = __create_test_group(client, store_id=identity_store_id)
group2 = __create_test_group(client, store_id=identity_store_id2)
assert __group_exists(client, group1[0], store_id=identity_store_id)
assert not __group_exists(client, group1[0], store_id=identity_store_id2)
assert __group_exists(client, group2[0], store_id=identity_store_id2)
assert not __group_exists(client, group2[0], store_id=identity_store_id)
@mock_identitystore
def test_create_group_membership():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
group_id = client.create_group(
IdentityStoreId=identity_store_id,
DisplayName="test_group",
Description="description",
)["GroupId"]
user_id = __create_and_verify_sparse_user(client, identity_store_id)
create_response = client.create_group_membership(
IdentityStoreId=identity_store_id,
GroupId=group_id,
MemberId={"UserId": user_id},
)
assert UUID(create_response["MembershipId"])
assert create_response["IdentityStoreId"] == identity_store_id
list_response = client.list_group_memberships(
IdentityStoreId=identity_store_id, GroupId=group_id
)
assert len(list_response["GroupMemberships"]) == 1
assert (
list_response["GroupMemberships"][0]["MembershipId"]
== create_response["MembershipId"]
)
assert list_response["GroupMemberships"][0]["MemberId"]["UserId"] == user_id
@mock_identitystore
def test_create_duplicate_username():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
# This should succeed
client.create_user(
IdentityStoreId=identity_store_id,
UserName="deleteme_username",
DisplayName="deleteme_displayname",
Name={"GivenName": "Givenname", "FamilyName": "Familyname"},
)
with pytest.raises(ClientError) as exc:
# This should fail
client.create_user(
IdentityStoreId=identity_store_id,
UserName="deleteme_username",
DisplayName="deleteme_displayname",
Name={"GivenName": "Givenname", "FamilyName": "Familyname"},
)
err = exc.value
assert "ConflictException" in str(type(err))
assert (
str(err)
== "An error occurred (ConflictException) when calling the CreateUser operation: Duplicate UserName"
)
assert err.operation_name == "CreateUser"
assert err.response["Error"]["Code"] == "ConflictException"
assert err.response["Error"]["Message"] == "Duplicate UserName"
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert err.response["Message"] == "Duplicate UserName"
assert err.response["Reason"] == "UNIQUENESS_CONSTRAINT_VIOLATION"
@mock_identitystore
def test_create_username_no_username():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
with pytest.raises(ClientError) as exc:
client.create_user(IdentityStoreId=identity_store_id)
err = exc.value
assert "ValidationException" in str(type(err))
assert (
str(err)
== "An error occurred (ValidationException) when calling the CreateUser operation: userName is a required attribute"
)
assert err.operation_name == "CreateUser"
assert err.response["Error"]["Code"] == "ValidationException"
assert err.response["Error"]["Message"] == "userName is a required attribute"
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert err.response["Message"] == "userName is a required attribute"
@mock_identitystore
def test_create_username_missing_required_attributes():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
with pytest.raises(ClientError) as exc:
client.create_user(
IdentityStoreId=identity_store_id, UserName="username", Name={}
)
err = exc.value
assert "ValidationException" in str(type(err))
assert (
str(err)
== "An error occurred (ValidationException) when calling the CreateUser operation: displayname: The attribute displayname is required, name: The attribute name is required"
)
assert err.operation_name == "CreateUser"
assert err.response["Error"]["Code"] == "ValidationException"
assert (
err.response["Error"]["Message"]
== "displayname: The attribute displayname is required, name: The attribute name is required"
)
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert (
err.response["Message"]
== "displayname: The attribute displayname is required, name: The attribute name is required"
)
@mock_identitystore
@pytest.mark.parametrize(
"field, missing", [("GivenName", "familyname"), ("FamilyName", "givenname")]
)
def test_create_username_missing_required_name_field(field, missing):
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
with pytest.raises(ClientError) as exc:
client.create_user(
IdentityStoreId=identity_store_id,
UserName="username",
DisplayName="displayName",
Name={field: field},
)
err = exc.value
assert "ValidationException" in str(type(err))
assert (
str(err)
== f"An error occurred (ValidationException) when calling the CreateUser operation: {missing}: The attribute {missing} is required"
)
assert err.operation_name == "CreateUser"
assert err.response["Error"]["Code"] == "ValidationException"
assert (
err.response["Error"]["Message"]
== f"{missing}: The attribute {missing} is required"
)
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert err.response["Message"] == f"{missing}: The attribute {missing} is required"
@mock_identitystore
def test_create_describe_sparse_user():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
response = client.create_user(
IdentityStoreId=identity_store_id,
UserName="the_username",
DisplayName="display_name",
Name={"GivenName": "given_name", "FamilyName": "family_name"},
)
assert UUID(response["UserId"])
user_resp = client.describe_user(
IdentityStoreId=identity_store_id, UserId=response["UserId"]
)
assert user_resp["UserName"] == "the_username"
assert user_resp["DisplayName"] == "display_name"
assert "Name" in user_resp
assert user_resp["Name"]["GivenName"] == "given_name"
assert user_resp["Name"]["FamilyName"] == "family_name"
@mock_identitystore
def test_create_describe_full_user():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
response = client.create_user(
IdentityStoreId=identity_store_id,
UserName="the_username",
DisplayName="display_name",
Name={
"Formatted": "formatted_name",
"GivenName": "given_name",
"FamilyName": "family_name",
"MiddleName": "middle_name",
"HonorificPrefix": "The Honorable",
"HonorificSuffix": "Wisest of us all",
},
NickName="nick_name",
ProfileUrl="https://example.com",
Emails=[
{"Value": "email1@example.com", "Type": "Personal", "Primary": True},
{"Value": "email2@example.com", "Type": "Work", "Primary": False},
],
Addresses=[
{
"StreetAddress": "123 Address St.",
"Locality": "locality",
"Region": "region",
"PostalCode": "123456",
"Country": "USA",
"Formatted": "123 Address St.\nlocality, region, 123456",
"Type": "Home",
"Primary": True,
},
],
PhoneNumbers=[
{"Value": "555-456-7890", "Type": "Home", "Primary": True},
],
UserType="user_type",
Title="title",
PreferredLanguage="preferred_language",
Locale="locale",
Timezone="timezone",
)
assert UUID(response["UserId"])
user_resp = client.describe_user(
IdentityStoreId=identity_store_id, UserId=response["UserId"]
)
assert user_resp["UserName"] == "the_username"
assert user_resp["DisplayName"] == "display_name"
assert "Name" in user_resp
assert user_resp["Name"]["Formatted"] == "formatted_name"
assert user_resp["Name"]["GivenName"] == "given_name"
assert user_resp["Name"]["FamilyName"] == "family_name"
assert user_resp["Name"]["MiddleName"] == "middle_name"
assert user_resp["Name"]["HonorificPrefix"] == "The Honorable"
assert user_resp["Name"]["HonorificSuffix"] == "Wisest of us all"
assert user_resp["NickName"] == "nick_name"
assert user_resp["ProfileUrl"] == "https://example.com"
assert "Emails" in user_resp
assert len(user_resp["Emails"]) == 2
email1 = user_resp["Emails"][0]
assert email1["Value"] == "email1@example.com"
assert email1["Type"] == "Personal"
assert email1["Primary"] is True
email2 = user_resp["Emails"][1]
assert email2["Value"] == "email2@example.com"
assert email2["Type"] == "Work"
assert email2["Primary"] is False
assert "Addresses" in user_resp
assert len(user_resp["Addresses"]) == 1
assert user_resp["Addresses"][0]["StreetAddress"] == "123 Address St."
assert user_resp["Addresses"][0]["Locality"] == "locality"
assert user_resp["Addresses"][0]["Region"] == "region"
assert user_resp["Addresses"][0]["PostalCode"] == "123456"
assert user_resp["Addresses"][0]["Country"] == "USA"
assert (
user_resp["Addresses"][0]["Formatted"]
== "123 Address St.\nlocality, region, 123456"
)
assert user_resp["Addresses"][0]["Type"] == "Home"
assert user_resp["Addresses"][0]["Primary"] is True
assert "PhoneNumbers" in user_resp
assert len(user_resp["PhoneNumbers"]) == 1
assert user_resp["PhoneNumbers"][0]["Value"] == "555-456-7890"
assert user_resp["PhoneNumbers"][0]["Type"] == "Home"
assert user_resp["PhoneNumbers"][0]["Primary"] is True
assert user_resp["UserType"] == "user_type"
assert user_resp["Title"] == "title"
assert user_resp["PreferredLanguage"] == "preferred_language"
assert user_resp["Locale"] == "locale"
assert user_resp["Timezone"] == "timezone"
@mock_identitystore
def test_describe_user_doesnt_exist():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
with pytest.raises(ClientError) as exc:
client.describe_user(
IdentityStoreId=identity_store_id, UserId=str(mock_random.uuid4())
)
err = exc.value
assert err.response["Error"]["Code"] == "ResourceNotFoundException"
assert err.response["Error"]["Message"] == "USER not found."
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert err.response["ResourceType"] == "USER"
assert err.response["Message"] == "USER not found."
assert "RequestId" in err.response
@mock_identitystore
def test_get_group_id():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
groups = {}
# Create a bunch of groups
for _ in range(1, 10):
group = __create_test_group(client, identity_store_id)
groups[group[0]] = group[1]
# Make sure we can get their ID
for name, group_id in groups.items():
response = client.get_group_id(
IdentityStoreId=identity_store_id,
AlternateIdentifier={
"UniqueAttribute": {
"AttributePath": "displayName",
"AttributeValue": name,
}
},
)
assert response["IdentityStoreId"] == identity_store_id
assert response["GroupId"] == group_id
@mock_identitystore
def test_get_group_id_does_not_exist():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
with pytest.raises(ClientError) as exc:
client.get_group_id(
IdentityStoreId=identity_store_id,
AlternateIdentifier={
"UniqueAttribute": {
"AttributePath": "displayName",
"AttributeValue": "does-not-exist",
}
},
)
err = exc.value
assert err.response["Error"]["Code"] == "ResourceNotFoundException"
assert err.response["Error"]["Message"] == "GROUP not found."
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert err.response["ResourceType"] == "GROUP"
assert err.response["Message"] == "GROUP not found."
assert "RequestId" in err.response
@mock_identitystore
def test_list_group_memberships():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
start = 0
end = 5000
batch_size = 321
next_token = None
membership_ids = []
group_id = client.create_group(
IdentityStoreId=identity_store_id,
DisplayName="test_group",
Description="description",
)["GroupId"]
for _ in range(end):
user_id = __create_and_verify_sparse_user(client, identity_store_id)
create_response = client.create_group_membership(
IdentityStoreId=identity_store_id,
GroupId=group_id,
MemberId={"UserId": user_id},
)
membership_ids.append((create_response["MembershipId"], user_id))
for iteration in range(start, end, batch_size):
last_iteration = end - iteration <= batch_size
expected_size = batch_size if not last_iteration else end - iteration
end_index = iteration + expected_size
if next_token is not None:
list_response = client.list_group_memberships(
IdentityStoreId=identity_store_id,
GroupId=group_id,
MaxResults=batch_size,
NextToken=next_token,
)
else:
list_response = client.list_group_memberships(
IdentityStoreId=identity_store_id,
GroupId=group_id,
MaxResults=batch_size,
)
assert len(list_response["GroupMemberships"]) == expected_size
__check_membership_list_values(
list_response["GroupMemberships"], membership_ids[iteration:end_index]
)
if last_iteration:
assert "NextToken" not in list_response
else:
assert "NextToken" in list_response
next_token = list_response["NextToken"]
def __check_membership_list_values(members, expected):
assert len(members) == len(expected)
for i in range(len(expected)):
assert members[i]["MembershipId"] == expected[i][0]
assert members[i]["MemberId"]["UserId"] == expected[i][1]
@mock_identitystore
def test_delete_group():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
test_group = __create_test_group(client, identity_store_id)
assert __group_exists(client, test_group[0], identity_store_id)
resp = client.delete_group(IdentityStoreId=identity_store_id, GroupId=test_group[1])
assert "ResponseMetadata" in resp
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
assert not __group_exists(client, test_group[0], identity_store_id)
@mock_identitystore
def test_delete_group_doesnt_exist():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
bogus_id = str(uuid4())
resp = client.delete_group(IdentityStoreId=identity_store_id, GroupId=bogus_id)
assert "ResponseMetadata" in resp
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
assert not __group_exists(client, bogus_id, identity_store_id)
@mock_identitystore
def test_delete_group_membership():
client = boto3.client("identitystore", region_name="eu-west-1")
identity_store_id = get_identity_store_id()
user_id = __create_and_verify_sparse_user(client, identity_store_id)
_, group_id = __create_test_group(client, identity_store_id)
membership = client.create_group_membership(
IdentityStoreId=identity_store_id,
GroupId=group_id,
MemberId={"UserId": user_id},
)
# Verify the group membership
response = client.list_group_memberships(
IdentityStoreId=identity_store_id, GroupId=group_id
)
assert response["GroupMemberships"][0]["MemberId"]["UserId"] == user_id
client.delete_group_membership(
IdentityStoreId=identity_store_id, MembershipId=membership["MembershipId"]
)
# Verify the group membership has been removed
response = client.list_group_memberships(
IdentityStoreId=identity_store_id, GroupId=group_id
)
assert len(response["GroupMemberships"]) == 0
@mock_identitystore
def test_delete_user():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
user_id = __create_and_verify_sparse_user(client, identity_store_id)
client.delete_user(IdentityStoreId=identity_store_id, UserId=user_id)
with pytest.raises(ClientError) as exc:
client.describe_user(IdentityStoreId=identity_store_id, UserId=user_id)
err = exc.value
assert err.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_identitystore
def test_delete_user_doesnt_exist():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
# This test ensures that the delete_user call does not raise an error if the user ID does not exist
client.delete_user(
IdentityStoreId=identity_store_id, UserId=str(mock_random.uuid4())
)
def __create_test_group(client, store_id: str):
rand = "".join(random.choices(string.ascii_lowercase, k=8))
group_name = f"test_group_{rand}"
create_resp = client.create_group(
IdentityStoreId=store_id,
DisplayName=group_name,
Description="description",
)
return group_name, create_resp["GroupId"]
def __group_exists(client, group_name: str, store_id: str) -> bool:
try:
client.get_group_id(
IdentityStoreId=store_id,
AlternateIdentifier={
"UniqueAttribute": {
"AttributePath": "displayName",
"AttributeValue": group_name,
}
},
)
return True
except ClientError as e:
if "ResourceNotFoundException" in str(type(e)):
return False
raise e
def __create_and_verify_sparse_user(client, store_id: str):
rand = random.choices(string.ascii_lowercase, k=8)
username = f"the_username_{rand}"
response = client.create_user(
IdentityStoreId=store_id,
UserName=username,
DisplayName=f"display_name_{rand}",
Name={"GivenName": f"given_name_{rand}", "FamilyName": f"family_name_{rand}"},
)
assert UUID(response["UserId"])
user_resp = client.describe_user(
IdentityStoreId=store_id, UserId=response["UserId"]
)
assert user_resp["UserName"] == username
return user_resp["UserId"]

View File

@ -7,6 +7,7 @@ from tests.test_redshiftdata.test_redshiftdata_constants import (
DEFAULT_ENCODING, DEFAULT_ENCODING,
HttpHeaders, HttpHeaders,
) )
from tests.helpers import match_uuid4 # noqa # pylint: disable=unused-import
CLIENT_ENDPOINT = "/" CLIENT_ENDPOINT = "/"