Implemented identitystore describe_group (#7015)

This commit is contained in:
Alexey Osheychik 2023-11-10 23:34:12 +01:00 committed by GitHub
parent d244885dbf
commit 84329158ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 17 deletions

View File

@ -14,6 +14,14 @@ from .exceptions import (
import warnings
class Group(NamedTuple):
GroupId: str
DisplayName: str
ExternalIds: List[Optional[Dict[str, str]]]
Description: str
IdentityStoreId: str
class Name(NamedTuple):
Formatted: Optional[str]
FamilyName: Optional[str]
@ -56,7 +64,7 @@ class User(NamedTuple):
class IdentityStoreData:
def __init__(self) -> None:
self.groups: Dict[str, Dict[str, str]] = {}
self.groups: Dict[str, Group] = {}
self.users: Dict[str, User] = {}
self.group_memberships: Dict[str, Any] = {}
@ -95,9 +103,7 @@ class IdentityStoreBackend(BaseBackend):
identity_store = self.__get_identity_store(identity_store_id)
matching = [
g
for g in identity_store.groups.values()
if g["DisplayName"] == display_name
g for g in identity_store.groups.values() if g.DisplayName == display_name
]
if len(matching) > 0:
raise ConflictException(
@ -106,13 +112,14 @@ class IdentityStoreBackend(BaseBackend):
)
group_id = str(mock_random.uuid4())
group_dict = {
"GroupId": group_id,
"IdentityStoreId": identity_store_id,
"DisplayName": display_name,
"Description": description,
}
identity_store.groups[group_id] = group_dict
group = Group(
group_id,
display_name,
[],
description,
identity_store_id,
)
identity_store.groups[group_id] = group
return group_id, identity_store_id
def get_group_id(
@ -127,10 +134,10 @@ class IdentityStoreBackend(BaseBackend):
):
for g in identity_store.groups.values():
if (
g["DisplayName"]
g.DisplayName
== alternate_identifier["UniqueAttribute"]["AttributeValue"]
):
return g["GroupId"], identity_store_id
return g.GroupId, identity_store_id
elif "ExternalId" in alternate_identifier:
warnings.warn("ExternalId has not been implemented.")
@ -138,6 +145,23 @@ class IdentityStoreBackend(BaseBackend):
message="GROUP not found.", resource_type="GROUP"
)
def describe_group(self, identity_store_id: str, group_id: str) -> Group:
identity_store = self.__get_identity_store(identity_store_id)
if group_id in identity_store.groups:
g = identity_store.groups[group_id]
# External Ids are not implemented
external_ids: List[Any] = []
return Group(
g.GroupId,
g.DisplayName,
external_ids,
g.Description,
identity_store_id,
)
raise ResourceNotFoundException(
message="GROUP not found.", resource_type="GROUP"
)
def delete_group(self, identity_store_id: str, group_id: str) -> None:
identity_store = self.__get_identity_store(identity_store_id)
if group_id in identity_store.groups:
@ -240,19 +264,19 @@ class IdentityStoreBackend(BaseBackend):
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
def list_groups(
self, identity_store_id: str, filters: List[Dict[str, str]]
) -> List[Dict[str, str]]:
) -> List[Dict[str, Any]]:
identity_store = self.__get_identity_store(identity_store_id)
if filters:
if filters[0].get("AttributePath") == "DisplayName":
displayname = filters[0].get("AttributeValue")
return [
m
m._asdict()
for m in identity_store.groups.values()
if m["DisplayName"] == displayname
if m.DisplayName == displayname
]
return [m for m in identity_store.groups.values()]
return [m._asdict() for m in identity_store.groups.values()]
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
def list_users(

View File

@ -150,6 +150,29 @@ class IdentityStoreResponse(BaseResponse):
return json.dumps(dict(Groups=groups, NextToken=next_token))
def describe_group(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
group_id = self._get_param("GroupId")
(
group_id,
display_name,
external_ids,
description,
identity_store_id,
) = self.identitystore_backend.describe_group(
identity_store_id=identity_store_id,
group_id=group_id,
)
return json.dumps(
dict(
GroupId=group_id,
DisplayName=display_name,
ExternalIds=external_ids,
Description=description,
IdentityStoreId=identity_store_id,
)
)
def list_users(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
max_results = self._get_param("MaxResults")

View File

@ -432,6 +432,7 @@ def test_list_groups():
{
"GroupId": group_id,
"DisplayName": display_name,
"ExternalIds": [],
"Description": description,
"IdentityStoreId": identity_store_id,
}
@ -476,6 +477,7 @@ def test_list_groups_filter():
expected_group = {
"GroupId": group_id,
"DisplayName": display_name,
"ExternalIds": [],
"Description": description,
"IdentityStoreId": identity_store_id,
}
@ -733,6 +735,40 @@ def test_delete_user_doesnt_exist():
)
@mock_identitystore
def test_create_describe_group() -> None:
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
group_name, group_descriprion, group_id = __create_test_group(
client, identity_store_id
)
client_response = client.describe_group(
IdentityStoreId=identity_store_id, GroupId=group_id
)
assert client_response["GroupId"] == group_id
assert client_response["DisplayName"] == group_name
assert client_response["Description"] == group_descriprion
assert client_response["IdentityStoreId"] == identity_store_id
@mock_identitystore
def test_describe_group_doesnt_exist() -> None:
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
with pytest.raises(ClientError) as exc:
client.describe_group(IdentityStoreId=identity_store_id, GroupId=str(uuid4()))
err = exc.value
assert err.response["Error"]["Code"] == "ResourceNotFoundException"
assert err.response["Error"]["Message"] == "GROUP not found."
assert err.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert err.response["ResourceType"] == "GROUP"
assert err.response["Message"] == "GROUP not found."
assert "RequestId" in err.response
def __create_test_group(client, store_id: str):
rand = "".join(random.choices(string.ascii_lowercase, k=8))
group_name = f"test_group_{rand}"