IdentityStore: list_users() (#6719)

This commit is contained in:
Joel McCoy 2023-08-25 03:01:43 -05:00 committed by GitHub
parent 6eb16e8038
commit a29f556358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 7 deletions

View File

@ -77,6 +77,12 @@ class IdentityStoreBackend(BaseBackend):
"limit_default": 100,
"unique_attribute": "GroupId",
},
"list_users": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"unique_attribute": "UserId",
},
}
def __init__(self, region_name: str, account_id: str) -> None:
@ -248,6 +254,32 @@ class IdentityStoreBackend(BaseBackend):
return [m for m in identity_store.groups.values()]
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore
def list_users(
self, identity_store_id: str, filters: List[Dict[str, str]]
) -> List[Dict[str, str]]:
identity_store = self.__get_identity_store(identity_store_id)
users = []
if filters:
if filters[0].get("AttributePath") == "UserName":
username = filters[0].get("AttributeValue")
for m in identity_store.users.values():
if m.UserName == username:
user = m._asdict()
if user.get("Name"):
user["Name"] = m.Name._asdict() # type: ignore
users.append(user)
return users
for m in identity_store.users.values():
user = m._asdict()
if user.get("Name"):
user["Name"] = m.Name._asdict() # type: ignore
users.append(user)
return users
def delete_group_membership(
self, identity_store_id: str, membership_id: str
) -> None:

View File

@ -150,6 +150,20 @@ class IdentityStoreResponse(BaseResponse):
return json.dumps(dict(Groups=groups, NextToken=next_token))
def list_users(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
filters = self._get_param("Filters")
(users, next_token,) = self.identitystore_backend.list_users(
identity_store_id=identity_store_id,
max_results=max_results,
next_token=next_token,
filters=filters,
)
return json.dumps(dict(Users=users, NextToken=next_token))
def delete_group(self) -> str:
identity_store_id = self._get_param("IdentityStoreId")
group_id = self._get_param("GroupId")

View File

@ -91,7 +91,7 @@ def test_create_group_membership():
Description="description",
)["GroupId"]
user_id = __create_and_verify_sparse_user(client, identity_store_id)
user_id = __create_and_verify_sparse_user(client, identity_store_id)["UserId"]
create_response = client.create_group_membership(
IdentityStoreId=identity_store_id,
@ -376,7 +376,6 @@ def test_get_group_id():
# Make sure we can get their ID
for name, group_id in groups.items():
response = client.get_group_id(
IdentityStoreId=identity_store_id,
AlternateIdentifier={
@ -522,7 +521,7 @@ def test_list_group_memberships():
)["GroupId"]
for _ in range(end):
user_id = __create_and_verify_sparse_user(client, identity_store_id)
user_id = __create_and_verify_sparse_user(client, identity_store_id)["UserId"]
create_response = client.create_group_membership(
IdentityStoreId=identity_store_id,
GroupId=group_id,
@ -567,6 +566,89 @@ def __check_membership_list_values(members, expected):
assert members[i]["MemberId"]["UserId"] == expected[i][1]
@mock_identitystore
def test_list_users():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
start = 0
end = 240
batch_size = 100
next_token = None
expected_users = list()
for _ in range(end):
dummy_user = __create_and_verify_sparse_user(client, identity_store_id)
expected_users.append(dummy_user)
users = list()
for iteration in range(start, end, batch_size):
last_iteration = end - iteration <= batch_size
expected_size = batch_size if not last_iteration else end - iteration
if next_token is not None:
list_response = client.list_users(
IdentityStoreId=identity_store_id,
MaxResults=batch_size,
NextToken=next_token,
)
else:
list_response = client.list_users(
IdentityStoreId=identity_store_id,
MaxResults=batch_size,
)
assert len(list_response["Users"]) == expected_size
users.extend(list_response["Users"])
if last_iteration:
assert "NextToken" not in list_response
else:
assert "NextToken" in list_response
next_token = list_response["NextToken"]
assert users == expected_users
@mock_identitystore
def test_list_users_filter():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
client.create_user(
IdentityStoreId=identity_store_id,
UserName="test_username_1",
DisplayName="test_display_name_1",
Name={"GivenName": "given_name", "FamilyName": "family_name"},
)
# Create a second user to see if it is not returned
client.create_user(
IdentityStoreId=identity_store_id,
UserName="test_username_2",
DisplayName="test_display_name_2",
Name={"GivenName": "given_name", "FamilyName": "family_name"},
)
users = client.list_users(
IdentityStoreId=identity_store_id,
Filters=[
{"AttributePath": "UserName", "AttributeValue": "test_username_1"},
],
)["Users"]
assert len(users) == 1
assert users[0]["UserName"] == "test_username_1"
no_users = client.list_users(
IdentityStoreId=identity_store_id,
Filters=[
{"AttributePath": "UserName", "AttributeValue": "non_existant_user"},
],
)["Users"]
assert len(no_users) == 0
@mock_identitystore
def test_delete_group():
client = boto3.client("identitystore", region_name="us-east-2")
@ -600,7 +682,7 @@ def test_delete_group_doesnt_exist():
def test_delete_group_membership():
client = boto3.client("identitystore", region_name="eu-west-1")
identity_store_id = get_identity_store_id()
user_id = __create_and_verify_sparse_user(client, identity_store_id)
user_id = __create_and_verify_sparse_user(client, identity_store_id)["UserId"]
_, _, group_id = __create_test_group(client, identity_store_id)
membership = client.create_group_membership(
@ -630,7 +712,7 @@ def test_delete_group_membership():
def test_delete_user():
client = boto3.client("identitystore", region_name="us-east-2")
identity_store_id = get_identity_store_id()
user_id = __create_and_verify_sparse_user(client, identity_store_id)
user_id = __create_and_verify_sparse_user(client, identity_store_id)["UserId"]
client.delete_user(IdentityStoreId=identity_store_id, UserId=user_id)
@ -684,7 +766,8 @@ def __group_exists(client, group_name: str, store_id: str) -> bool:
def __create_and_verify_sparse_user(client, store_id: str):
rand = random.choices(string.ascii_lowercase, k=8)
"""Creates a user, verifies it is unique and returns the response of describe_user"""
rand = "".join(random.choices(string.ascii_lowercase, k=8))
username = f"the_username_{rand}"
response = client.create_user(
IdentityStoreId=store_id,
@ -699,4 +782,7 @@ def __create_and_verify_sparse_user(client, store_id: str):
)
assert user_resp["UserName"] == username
return user_resp["UserId"]
del user_resp[
"ResponseMetadata"
] # strip response metadata and just return user info
return user_resp