cognito-idp - pagination for list_groups and list_users_in_group. (#5292)

This commit is contained in:
bspot 2022-07-26 01:54:50 +02:00 committed by GitHub
parent 40eefb92d7
commit cc8f8fd805
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 132 additions and 6 deletions

View File

@ -1041,10 +1041,11 @@ class CognitoIdpBackend(BaseBackend):
return user_pool.groups[group_name]
@paginate(pagination_model=PAGINATION_MODEL)
def list_groups(self, user_pool_id):
user_pool = self.describe_user_pool(user_pool_id)
return user_pool.groups.values()
return list(user_pool.groups.values())
def delete_group(self, user_pool_id, group_name):
user_pool = self.describe_user_pool(user_pool_id)
@ -1072,9 +1073,11 @@ class CognitoIdpBackend(BaseBackend):
group.users.add(user)
user.groups.add(group)
@paginate(pagination_model=PAGINATION_MODEL)
def list_users_in_group(self, user_pool_id, group_name):
user_pool = self.describe_user_pool(user_pool_id)
group = self.get_group(user_pool_id, group_name)
return list(group.users)
return list(filter(lambda user: user in group.users, user_pool.users.values()))
def admin_list_groups_for_user(self, user_pool_id, username):
user = self.admin_get_user(user_pool_id, username)

View File

@ -249,8 +249,15 @@ class CognitoIdpResponse(BaseResponse):
def list_groups(self):
user_pool_id = self._get_param("UserPoolId")
groups = self.backend.list_groups(user_pool_id)
return json.dumps({"Groups": [group.to_json() for group in groups]})
limit = self._get_param("Limit")
token = self._get_param("NextToken")
groups, token = self.backend.list_groups(
user_pool_id, limit=limit, next_token=token
)
response = {"Groups": [group.to_json() for group in groups]}
if token:
response["NextToken"] = token
return json.dumps(response)
def delete_group(self):
group_name = self._get_param("GroupName")
@ -283,8 +290,15 @@ class CognitoIdpResponse(BaseResponse):
def list_users_in_group(self):
user_pool_id = self._get_param("UserPoolId")
group_name = self._get_param("GroupName")
users = self.backend.list_users_in_group(user_pool_id, group_name)
return json.dumps({"Users": [user.to_json(extended=True) for user in users]})
limit = self._get_param("Limit")
token = self._get_param("NextToken")
users, token = self.backend.list_users_in_group(
user_pool_id, group_name, limit=limit, next_token=token
)
response = {"Users": [user.to_json(extended=True) for user in users]}
if token:
response["NextToken"] = token
return json.dumps(response)
def admin_list_groups_for_user(self):
username = self._get_param("Username")

View File

@ -37,6 +37,18 @@ PAGINATION_MODEL = {
"limit_default": 60,
"unique_attribute": "id",
},
"list_groups": {
"input_token": "next_token",
"limit_key": "limit",
"limit_default": 60,
"unique_attribute": "group_name",
},
"list_users_in_group": {
"input_token": "next_token",
"limit_key": "limit",
"limit_default": 60,
"unique_attribute": "id",
},
}

View File

@ -1524,6 +1524,45 @@ def test_list_groups():
result["Groups"][0]["GroupName"].should.equal(group_name)
@mock_cognitoidp
def test_list_groups_returns_pagination_tokens():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
# Given 10 groups
group_count = 10
for _ in range(group_count):
conn.create_group(UserPoolId=user_pool_id, GroupName=str(uuid.uuid4()))
max_results = 5
result = conn.list_groups(UserPoolId=user_pool_id, Limit=max_results)
result["Groups"].should.have.length_of(max_results)
result.should.have.key("NextToken")
next_token = result["NextToken"]
result_2 = conn.list_groups(
UserPoolId=user_pool_id, Limit=max_results, NextToken=next_token
)
result_2["Groups"].should.have.length_of(max_results)
result_2.shouldnt.have.key("NextToken")
@mock_cognitoidp
def test_list_groups_when_limit_more_than_total_items():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
# Given 10 users
group_count = 10
for _ in range(group_count):
conn.create_group(UserPoolId=user_pool_id, GroupName=str(uuid.uuid4()))
max_results = group_count + 5
result = conn.list_groups(UserPoolId=user_pool_id, Limit=max_results)
result["Groups"].should.have.length_of(group_count)
result.shouldnt.have.key("NextToken")
@mock_cognitoidp
def test_delete_group():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1644,6 +1683,64 @@ def test_list_users_in_group_ignores_deleted_user():
result["Users"][0]["Username"].should.equal(username2)
@mock_cognitoidp
def test_list_users_in_group_returns_pagination_tokens():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
# Given 10 users
usernames = [str(uuid.uuid4()) for _ in range(10)]
for username in usernames:
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(
UserPoolId=user_pool_id, Username=username, GroupName=group_name
)
max_results = 5
result = conn.list_users_in_group(
UserPoolId=user_pool_id, GroupName=group_name, Limit=max_results
)
result["Users"].should.have.length_of(max_results)
result.should.have.key("NextToken")
next_token = result["NextToken"]
result_2 = conn.list_users_in_group(
UserPoolId=user_pool_id,
GroupName=group_name,
Limit=max_results,
NextToken=next_token,
)
result_2["Users"].should.have.length_of(max_results)
result_2.shouldnt.have.key("NextToken")
@mock_cognitoidp
def test_list_users_in_group_when_limit_more_than_total_items():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
# Given 10 users
usernames = [str(uuid.uuid4()) for _ in range(10)]
for username in usernames:
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(
UserPoolId=user_pool_id, Username=username, GroupName=group_name
)
max_results = len(usernames) + 5
result = conn.list_users_in_group(
UserPoolId=user_pool_id, GroupName=group_name, Limit=max_results
)
result["Users"].should.have.length_of(len(usernames))
result.shouldnt.have.key("NextToken")
@mock_cognitoidp
def test_admin_list_groups_for_user():
conn = boto3.client("cognito-idp", "us-west-2")