From 76a743c56e0c260b9241f02f6edeae06ed2e5f16 Mon Sep 17 00:00:00 2001 From: Joel McCoy Date: Mon, 14 Aug 2023 04:52:11 -0500 Subject: [PATCH] Implemented IdentityStore list_groups function (#6644) --- moto/identitystore/models.py | 23 ++++ moto/identitystore/responses.py | 14 +++ .../test_identitystore/test_identitystore.py | 104 +++++++++++++++++- 3 files changed, 135 insertions(+), 6 deletions(-) diff --git a/moto/identitystore/models.py b/moto/identitystore/models.py index 07decf2c8..456c1f7fc 100644 --- a/moto/identitystore/models.py +++ b/moto/identitystore/models.py @@ -71,6 +71,12 @@ class IdentityStoreBackend(BaseBackend): "limit_default": 100, "unique_attribute": "MembershipId", }, + "list_groups": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 100, + "unique_attribute": "GroupId", + }, } def __init__(self, region_name: str, account_id: str) -> None: @@ -225,6 +231,23 @@ class IdentityStoreBackend(BaseBackend): if m["GroupId"] == group_id ] + @paginate(pagination_model=PAGINATION_MODEL) # type: ignore + def list_groups( + self, identity_store_id: str, filters: List[Dict[str, str]] + ) -> List[Dict[str, str]]: + identity_store = self.__get_identity_store(identity_store_id) + + if filters: + if filters[0].get("AttributePath") == "DisplayName": + displayname = filters[0].get("AttributeValue") + return [ + m + for m in identity_store.groups.values() + if m["DisplayName"] == displayname + ] + + return [m for m in identity_store.groups.values()] + def delete_group_membership( self, identity_store_id: str, membership_id: str ) -> None: diff --git a/moto/identitystore/responses.py b/moto/identitystore/responses.py index 4467709ad..ce18dd06a 100644 --- a/moto/identitystore/responses.py +++ b/moto/identitystore/responses.py @@ -136,6 +136,20 @@ class IdentityStoreResponse(BaseResponse): dict(GroupMemberships=group_memberships, NextToken=next_token) ) + def list_groups(self) -> str: + identity_store_id = self._get_param("IdentityStoreId") + max_results = self._get_param("MaxResults") + next_token = self._get_param("NextToken") + filters = self._get_param("Filters") + (groups, next_token,) = self.identitystore_backend.list_groups( + identity_store_id=identity_store_id, + max_results=max_results, + next_token=next_token, + filters=filters, + ) + + return json.dumps(dict(Groups=groups, NextToken=next_token)) + def delete_group(self) -> str: identity_store_id = self._get_param("IdentityStoreId") group_id = self._get_param("GroupId") diff --git a/tests/test_identitystore/test_identitystore.py b/tests/test_identitystore/test_identitystore.py index ad7520d37..73c626d15 100644 --- a/tests/test_identitystore/test_identitystore.py +++ b/tests/test_identitystore/test_identitystore.py @@ -16,7 +16,8 @@ from moto.moto_api._internal import mock_random def get_identity_store_id() -> str: - return f"d-{random.choices(string.ascii_lowercase, k=10)}" + rand = "".join(random.choices(string.ascii_lowercase, k=10)) + return f"d-{rand}" @mock_identitystore @@ -371,7 +372,7 @@ def test_get_group_id(): # Create a bunch of groups for _ in range(1, 10): group = __create_test_group(client, identity_store_id) - groups[group[0]] = group[1] + groups[group[0]] = group[2] # Make sure we can get their ID for name, group_id in groups.items(): @@ -413,6 +414,96 @@ def test_get_group_id_does_not_exist(): assert "RequestId" in err.response +@mock_identitystore +def test_list_groups(): + client = boto3.client("identitystore", region_name="us-east-2") + identity_store_id = get_identity_store_id() + + start = 0 + end = 266 + batch_size = 100 + next_token = None + + expected_groups = list() + for _ in range(end): + display_name, description, group_id = __create_test_group( + client=client, store_id=identity_store_id + ) + expected_groups.append( + { + "GroupId": group_id, + "DisplayName": display_name, + "Description": description, + "IdentityStoreId": identity_store_id, + } + ) + + groups = list() + for iteration in range(start, end, batch_size): + last_iteration = end - iteration <= batch_size + expected_size = batch_size if not last_iteration else end - iteration + + if next_token is not None: + list_response = client.list_groups( + IdentityStoreId=identity_store_id, + MaxResults=batch_size, + NextToken=next_token, + ) + else: + list_response = client.list_groups( + IdentityStoreId=identity_store_id, + MaxResults=batch_size, + ) + + assert len(list_response["Groups"]) == expected_size + groups.extend(list_response["Groups"]) + if last_iteration: + assert "NextToken" not in list_response + else: + assert "NextToken" in list_response + next_token = list_response["NextToken"] + + assert groups == expected_groups + + +@mock_identitystore +def test_list_groups_filter(): + client = boto3.client("identitystore", region_name="us-east-2") + identity_store_id = get_identity_store_id() + + display_name, description, group_id = __create_test_group( + client=client, store_id=identity_store_id + ) + expected_group = { + "GroupId": group_id, + "DisplayName": display_name, + "Description": description, + "IdentityStoreId": identity_store_id, + } + + # Create a second test group to see if it is not returned + __create_test_group(client=client, store_id=identity_store_id) + + groups = client.list_groups( + IdentityStoreId=identity_store_id, + Filters=[ + {"AttributePath": "DisplayName", "AttributeValue": display_name}, + ], + )["Groups"] + + assert len(groups) == 1 + assert groups[0] == expected_group + + no_groups = client.list_groups( + IdentityStoreId=identity_store_id, + Filters=[ + {"AttributePath": "DisplayName", "AttributeValue": "non_existant_group"}, + ], + )["Groups"] + + assert len(no_groups) == 0 + + @mock_identitystore def test_list_group_memberships(): client = boto3.client("identitystore", region_name="us-east-2") @@ -484,7 +575,7 @@ def test_delete_group(): test_group = __create_test_group(client, identity_store_id) assert __group_exists(client, test_group[0], identity_store_id) - resp = client.delete_group(IdentityStoreId=identity_store_id, GroupId=test_group[1]) + resp = client.delete_group(IdentityStoreId=identity_store_id, GroupId=test_group[2]) assert "ResponseMetadata" in resp assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 @@ -510,7 +601,7 @@ def test_delete_group_membership(): client = boto3.client("identitystore", region_name="eu-west-1") identity_store_id = get_identity_store_id() user_id = __create_and_verify_sparse_user(client, identity_store_id) - _, group_id = __create_test_group(client, identity_store_id) + _, _, group_id = __create_test_group(client, identity_store_id) membership = client.create_group_membership( IdentityStoreId=identity_store_id, @@ -563,14 +654,15 @@ def test_delete_user_doesnt_exist(): def __create_test_group(client, store_id: str): rand = "".join(random.choices(string.ascii_lowercase, k=8)) group_name = f"test_group_{rand}" + description = f"random_description_{rand}" create_resp = client.create_group( IdentityStoreId=store_id, DisplayName=group_name, - Description="description", + Description=description, ) - return group_name, create_resp["GroupId"] + return group_name, description, create_resp["GroupId"] def __group_exists(client, group_name: str, store_id: str) -> bool: