Implemented IdentityStore list_groups function (#6644)

This commit is contained in:
Joel McCoy 2023-08-14 04:52:11 -05:00 committed by GitHub
parent 0520df02c1
commit 76a743c56e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 6 deletions

View File

@ -71,6 +71,12 @@ class IdentityStoreBackend(BaseBackend):
"limit_default": 100,
"unique_attribute": "MembershipId",
},
"list_groups": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "GroupId",
},
}
def __init__(self, region_name: str, account_id: str) -> None:
@ -225,6 +231,23 @@ class IdentityStoreBackend(BaseBackend):
if m["GroupId"] == group_id
]
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
def list_groups(
self, identity_store_id: str, filters: List[Dict[str, str]]
) -> List[Dict[str, str]]:
identity_store = self.__get_identity_store(identity_store_id)
if filters:
if filters[0].get("AttributePath") == "DisplayName":
displayname = filters[0].get("AttributeValue")
return [
m
for m in identity_store.groups.values()
if m["DisplayName"] == displayname
]
return [m for m in identity_store.groups.values()]
def delete_group_membership(
self, identity_store_id: str, membership_id: str
) -> None:

View File

@ -136,6 +136,20 @@ class IdentityStoreResponse(BaseResponse):
dict(GroupMemberships=group_memberships, NextToken=next_token)
)
def list_groups(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
filters = self._get_param("Filters")
(groups, next_token,) = self.identitystore_backend.list_groups(
identity_store_id=identity_store_id,
max_results=max_results,
next_token=next_token,
filters=filters,
)
return json.dumps(dict(Groups=groups, NextToken=next_token))
def delete_group(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
group_id = self._get_param("GroupId")

View File

@ -16,7 +16,8 @@ from moto.moto_api._internal import mock_random
def get_identity_store_id() -> str:
return f"d-{random.choices(string.ascii_lowercase, k=10)}"
rand = "".join(random.choices(string.ascii_lowercase, k=10))
return f"d-{rand}"
@mock_identitystore
@ -371,7 +372,7 @@ def test_get_group_id():
# Create a bunch of groups
for _ in range(1, 10):
group = __create_test_group(client, identity_store_id)
groups[group[0]] = group[1]
groups[group[0]] = group[2]
# Make sure we can get their ID
for name, group_id in groups.items():
@ -413,6 +414,96 @@ def test_get_group_id_does_not_exist():
assert "RequestId" in err.response
@mock_identitystore
def test_list_groups():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
start = 0
end = 266
batch_size = 100
next_token = None
expected_groups = list()
for _ in range(end):
display_name, description, group_id = __create_test_group(
client=client, store_id=identity_store_id
)
expected_groups.append(
{
"GroupId": group_id,
"DisplayName": display_name,
"Description": description,
"IdentityStoreId": identity_store_id,
}
)
groups = list()
for iteration in range(start, end, batch_size):
last_iteration = end - iteration <= batch_size
expected_size = batch_size if not last_iteration else end - iteration
if next_token is not None:
list_response = client.list_groups(
IdentityStoreId=identity_store_id,
MaxResults=batch_size,
NextToken=next_token,
)
else:
list_response = client.list_groups(
IdentityStoreId=identity_store_id,
MaxResults=batch_size,
)
assert len(list_response["Groups"]) == expected_size
groups.extend(list_response["Groups"])
if last_iteration:
assert "NextToken" not in list_response
else:
assert "NextToken" in list_response
next_token = list_response["NextToken"]
assert groups == expected_groups
@mock_identitystore
def test_list_groups_filter():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
display_name, description, group_id = __create_test_group(
client=client, store_id=identity_store_id
)
expected_group = {
"GroupId": group_id,
"DisplayName": display_name,
"Description": description,
"IdentityStoreId": identity_store_id,
}
# Create a second test group to see if it is not returned
__create_test_group(client=client, store_id=identity_store_id)
groups = client.list_groups(
IdentityStoreId=identity_store_id,
Filters=[
{"AttributePath": "DisplayName", "AttributeValue": display_name},
],
)["Groups"]
assert len(groups) == 1
assert groups[0] == expected_group
no_groups = client.list_groups(
IdentityStoreId=identity_store_id,
Filters=[
{"AttributePath": "DisplayName", "AttributeValue": "non_existant_group"},
],
)["Groups"]
assert len(no_groups) == 0
@mock_identitystore
def test_list_group_memberships():
client = boto3.client("identitystore", region_name="us-east-2")
@ -484,7 +575,7 @@ def test_delete_group():
test_group = __create_test_group(client, identity_store_id)
assert __group_exists(client, test_group[0], identity_store_id)
resp = client.delete_group(IdentityStoreId=identity_store_id, GroupId=test_group[1])
resp = client.delete_group(IdentityStoreId=identity_store_id, GroupId=test_group[2])
assert "ResponseMetadata" in resp
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
@ -510,7 +601,7 @@ def test_delete_group_membership():
client = boto3.client("identitystore", region_name="eu-west-1")
identity_store_id = get_identity_store_id()
user_id = __create_and_verify_sparse_user(client, identity_store_id)
_, group_id = __create_test_group(client, identity_store_id)
_, _, group_id = __create_test_group(client, identity_store_id)
membership = client.create_group_membership(
IdentityStoreId=identity_store_id,
@ -563,14 +654,15 @@ def test_delete_user_doesnt_exist():
def __create_test_group(client, store_id: str):
rand = "".join(random.choices(string.ascii_lowercase, k=8))
group_name = f"test_group_{rand}"
description = f"random_description_{rand}"
create_resp = client.create_group(
IdentityStoreId=store_id,
DisplayName=group_name,
Description="description",
Description=description,
)
return group_name, create_resp["GroupId"]
return group_name, description, create_resp["GroupId"]
def __group_exists(client, group_name: str, store_id: str) -> bool: