Adds limiting/pagination to cognitoidp list_* functions
This commit is contained in:
parent
71a054af92
commit
aa4be6fcad
@ -1,6 +1,7 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
@ -20,6 +21,43 @@ UserStatus = {
|
||||
}
|
||||
|
||||
|
||||
def paginate(limit, start_arg="next_token", limit_arg="max_results"):
|
||||
"""Returns a limited result list, and an offset into list of remaining items
|
||||
|
||||
Takes the next_token, and max_results kwargs given to a function and handles
|
||||
the slicing of the results. The kwarg `next_token` is the offset into the
|
||||
list to begin slicing from. `max_results` is the size of the result required
|
||||
|
||||
If the max_results is not supplied then the `limit` parameter is used as a
|
||||
default
|
||||
|
||||
:param limit_arg: the name of argument in the decorated function that
|
||||
controls amount of items returned
|
||||
:param start_arg: the name of the argument in the decorated that provides
|
||||
the starting offset
|
||||
:param limit: A default maximum items to return
|
||||
:return: a tuple containing a list of items, and the offset into the list
|
||||
"""
|
||||
default_start = 0
|
||||
|
||||
def outer_wrapper(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
# setup
|
||||
start = int(default_start if kwargs.get(start_arg) is None else kwargs[start_arg])
|
||||
stop = int(limit if kwargs.get(limit_arg) is None else kwargs[limit_arg])
|
||||
end = start + stop
|
||||
# call
|
||||
result = func(*args, **kwargs)
|
||||
# modify
|
||||
results = list(result)
|
||||
limited_results = results[start: end]
|
||||
next_token = end if end < len(results) else None
|
||||
return limited_results, next_token
|
||||
return wrapper
|
||||
return outer_wrapper
|
||||
|
||||
|
||||
class CognitoIdpUserPool(BaseModel):
|
||||
|
||||
def __init__(self, region, name, extended_config):
|
||||
@ -242,7 +280,8 @@ class CognitoIdpBackend(BaseBackend):
|
||||
self.user_pools[user_pool.id] = user_pool
|
||||
return user_pool
|
||||
|
||||
def list_user_pools(self):
|
||||
@paginate(60)
|
||||
def list_user_pools(self, max_results=None, next_token=None):
|
||||
return self.user_pools.values()
|
||||
|
||||
def describe_user_pool(self, user_pool_id):
|
||||
@ -289,7 +328,8 @@ class CognitoIdpBackend(BaseBackend):
|
||||
user_pool.clients[user_pool_client.id] = user_pool_client
|
||||
return user_pool_client
|
||||
|
||||
def list_user_pool_clients(self, user_pool_id):
|
||||
@paginate(60)
|
||||
def list_user_pool_clients(self, user_pool_id, max_results=None, next_token=None):
|
||||
user_pool = self.user_pools.get(user_pool_id)
|
||||
if not user_pool:
|
||||
raise ResourceNotFoundError(user_pool_id)
|
||||
@ -339,7 +379,8 @@ class CognitoIdpBackend(BaseBackend):
|
||||
user_pool.identity_providers[name] = identity_provider
|
||||
return identity_provider
|
||||
|
||||
def list_identity_providers(self, user_pool_id):
|
||||
@paginate(60)
|
||||
def list_identity_providers(self, user_pool_id, max_results=None, next_token=None):
|
||||
user_pool = self.user_pools.get(user_pool_id)
|
||||
if not user_pool:
|
||||
raise ResourceNotFoundError(user_pool_id)
|
||||
@ -387,7 +428,8 @@ class CognitoIdpBackend(BaseBackend):
|
||||
|
||||
return user_pool.users[username]
|
||||
|
||||
def list_users(self, user_pool_id):
|
||||
@paginate(60, "pagination_token", "limit")
|
||||
def list_users(self, user_pool_id, pagination_token=None, limit=None):
|
||||
user_pool = self.user_pools.get(user_pool_id)
|
||||
if not user_pool:
|
||||
raise ResourceNotFoundError(user_pool_id)
|
||||
|
@ -22,10 +22,17 @@ class CognitoIdpResponse(BaseResponse):
|
||||
})
|
||||
|
||||
def list_user_pools(self):
|
||||
user_pools = cognitoidp_backends[self.region].list_user_pools()
|
||||
return json.dumps({
|
||||
"UserPools": [user_pool.to_json() for user_pool in user_pools]
|
||||
})
|
||||
max_results = self._get_param("MaxResults")
|
||||
next_token = self._get_param("NextToken", "0")
|
||||
user_pools, next_token = cognitoidp_backends[self.region].list_user_pools(
|
||||
max_results=max_results, next_token=next_token
|
||||
)
|
||||
response = {
|
||||
"UserPools": [user_pool.to_json() for user_pool in user_pools],
|
||||
}
|
||||
if next_token:
|
||||
response["NextToken"] = str(next_token)
|
||||
return json.dumps(response)
|
||||
|
||||
def describe_user_pool(self):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
@ -72,10 +79,16 @@ class CognitoIdpResponse(BaseResponse):
|
||||
|
||||
def list_user_pool_clients(self):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
user_pool_clients = cognitoidp_backends[self.region].list_user_pool_clients(user_pool_id)
|
||||
return json.dumps({
|
||||
max_results = self._get_param("MaxResults")
|
||||
next_token = self._get_param("NextToken", "0")
|
||||
user_pool_clients, next_token = cognitoidp_backends[self.region].list_user_pool_clients(user_pool_id,
|
||||
max_results=max_results, next_token=next_token)
|
||||
response = {
|
||||
"UserPoolClients": [user_pool_client.to_json() for user_pool_client in user_pool_clients]
|
||||
})
|
||||
}
|
||||
if next_token:
|
||||
response["NextToken"] = str(next_token)
|
||||
return json.dumps(response)
|
||||
|
||||
def describe_user_pool_client(self):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
@ -110,10 +123,17 @@ class CognitoIdpResponse(BaseResponse):
|
||||
|
||||
def list_identity_providers(self):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
identity_providers = cognitoidp_backends[self.region].list_identity_providers(user_pool_id)
|
||||
return json.dumps({
|
||||
max_results = self._get_param("MaxResults")
|
||||
next_token = self._get_param("NextToken", "0")
|
||||
identity_providers, next_token = cognitoidp_backends[self.region].list_identity_providers(
|
||||
user_pool_id, max_results=max_results, next_token=next_token
|
||||
)
|
||||
response = {
|
||||
"Providers": [identity_provider.to_json() for identity_provider in identity_providers]
|
||||
})
|
||||
}
|
||||
if next_token:
|
||||
response["NextToken"] = str(next_token)
|
||||
return json.dumps(response)
|
||||
|
||||
def describe_identity_provider(self):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
@ -155,10 +175,15 @@ class CognitoIdpResponse(BaseResponse):
|
||||
|
||||
def list_users(self):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
users = cognitoidp_backends[self.region].list_users(user_pool_id)
|
||||
return json.dumps({
|
||||
"Users": [user.to_json(extended=True) for user in users]
|
||||
})
|
||||
limit = self._get_param("Limit")
|
||||
token = self._get_param("PaginationToken")
|
||||
users, token = cognitoidp_backends[self.region].list_users(user_pool_id,
|
||||
limit=limit,
|
||||
pagination_token=token)
|
||||
response = {"Users": [user.to_json(extended=True) for user in users]}
|
||||
if token:
|
||||
response["PaginationToken"] = str(token)
|
||||
return json.dumps(response)
|
||||
|
||||
def admin_disable_user(self):
|
||||
user_pool_id = self._get_param("UserPoolId")
|
||||
|
@ -41,6 +41,56 @@ def test_list_user_pools():
|
||||
result["UserPools"][0]["Name"].should.equal(name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pools_returns_max_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
# Given 10 user pools
|
||||
pool_count = 10
|
||||
for i in range(pool_count):
|
||||
conn.create_user_pool(PoolName=str(uuid.uuid4()))
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_user_pools(MaxResults=max_results)
|
||||
result["UserPools"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pools_returns_next_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
# Given 10 user pool clients
|
||||
pool_count = 10
|
||||
for i in range(pool_count):
|
||||
conn.create_user_pool(PoolName=str(uuid.uuid4()))
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_user_pools(MaxResults=max_results)
|
||||
result["UserPools"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
next_token = result["NextToken"]
|
||||
result_2 = conn.list_user_pools(MaxResults=max_results, NextToken=next_token)
|
||||
result_2["UserPools"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pools_when_max_items_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
||||
# Given 10 user pool clients
|
||||
pool_count = 10
|
||||
for i in range(pool_count):
|
||||
conn.create_user_pool(PoolName=str(uuid.uuid4()))
|
||||
|
||||
max_results = pool_count + 5
|
||||
result = conn.list_user_pools(MaxResults=max_results)
|
||||
result["UserPools"].should.have.length_of(pool_count)
|
||||
result.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_describe_user_pool():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
@ -140,6 +190,67 @@ def test_list_user_pool_clients():
|
||||
result["UserPoolClients"][0]["ClientName"].should.equal(client_name)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pool_clients_returns_max_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 user pool clients
|
||||
client_count = 10
|
||||
for i in range(client_count):
|
||||
client_name = str(uuid.uuid4())
|
||||
conn.create_user_pool_client(UserPoolId=user_pool_id,
|
||||
ClientName=client_name)
|
||||
max_results = 5
|
||||
result = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["UserPoolClients"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pool_clients_returns_next_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 user pool clients
|
||||
client_count = 10
|
||||
for i in range(client_count):
|
||||
client_name = str(uuid.uuid4())
|
||||
conn.create_user_pool_client(UserPoolId=user_pool_id,
|
||||
ClientName=client_name)
|
||||
max_results = 5
|
||||
result = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["UserPoolClients"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
next_token = result["NextToken"]
|
||||
result_2 = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results,
|
||||
NextToken=next_token)
|
||||
result_2["UserPoolClients"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_user_pool_clients_when_max_items_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 user pool clients
|
||||
client_count = 10
|
||||
for i in range(client_count):
|
||||
client_name = str(uuid.uuid4())
|
||||
conn.create_user_pool_client(UserPoolId=user_pool_id,
|
||||
ClientName=client_name)
|
||||
max_results = client_count + 5
|
||||
result = conn.list_user_pool_clients(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["UserPoolClients"].should.have.length_of(client_count)
|
||||
result.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_describe_user_pool_client():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
@ -264,6 +375,83 @@ def test_list_identity_providers():
|
||||
result["Providers"][0]["ProviderType"].should.equal(provider_type)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_identity_providers_returns_max_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 identity providers linked to a user pool
|
||||
identity_provider_count = 10
|
||||
for i in range(identity_provider_count):
|
||||
provider_name = str(uuid.uuid4())
|
||||
provider_type = "Facebook"
|
||||
conn.create_identity_provider(
|
||||
UserPoolId=user_pool_id,
|
||||
ProviderName=provider_name,
|
||||
ProviderType=provider_type,
|
||||
ProviderDetails={},
|
||||
)
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_identity_providers(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results)
|
||||
result["Providers"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_identity_providers_returns_next_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 identity providers linked to a user pool
|
||||
identity_provider_count = 10
|
||||
for i in range(identity_provider_count):
|
||||
provider_name = str(uuid.uuid4())
|
||||
provider_type = "Facebook"
|
||||
conn.create_identity_provider(
|
||||
UserPoolId=user_pool_id,
|
||||
ProviderName=provider_name,
|
||||
ProviderType=provider_type,
|
||||
ProviderDetails={},
|
||||
)
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_identity_providers(UserPoolId=user_pool_id, MaxResults=max_results)
|
||||
result["Providers"].should.have.length_of(max_results)
|
||||
result.should.have.key("NextToken")
|
||||
|
||||
next_token = result["NextToken"]
|
||||
result_2 = conn.list_identity_providers(UserPoolId=user_pool_id,
|
||||
MaxResults=max_results,
|
||||
NextToken=next_token)
|
||||
result_2["Providers"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_identity_providers_when_max_items_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 identity providers linked to a user pool
|
||||
identity_provider_count = 10
|
||||
for i in range(identity_provider_count):
|
||||
provider_name = str(uuid.uuid4())
|
||||
provider_type = "Facebook"
|
||||
conn.create_identity_provider(
|
||||
UserPoolId=user_pool_id,
|
||||
ProviderName=provider_name,
|
||||
ProviderType=provider_type,
|
||||
ProviderDetails={},
|
||||
)
|
||||
|
||||
max_results = identity_provider_count + 5
|
||||
result = conn.list_identity_providers(UserPoolId=user_pool_id, MaxResults=max_results)
|
||||
result["Providers"].should.have.length_of(identity_provider_count)
|
||||
result.shouldnt.have.key("NextToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_describe_identity_providers():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
@ -396,6 +584,62 @@ def test_list_users():
|
||||
result["Users"][0]["Username"].should.equal(username)
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_returns_limit_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 users
|
||||
user_count = 10
|
||||
for i in range(user_count):
|
||||
conn.admin_create_user(UserPoolId=user_pool_id,
|
||||
Username=str(uuid.uuid4()))
|
||||
max_results = 5
|
||||
result = conn.list_users(UserPoolId=user_pool_id, Limit=max_results)
|
||||
result["Users"].should.have.length_of(max_results)
|
||||
result.should.have.key("PaginationToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_returns_pagination_tokens():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 users
|
||||
user_count = 10
|
||||
for i in range(user_count):
|
||||
conn.admin_create_user(UserPoolId=user_pool_id,
|
||||
Username=str(uuid.uuid4()))
|
||||
|
||||
max_results = 5
|
||||
result = conn.list_users(UserPoolId=user_pool_id, Limit=max_results)
|
||||
result["Users"].should.have.length_of(max_results)
|
||||
result.should.have.key("PaginationToken")
|
||||
|
||||
next_token = result["PaginationToken"]
|
||||
result_2 = conn.list_users(UserPoolId=user_pool_id,
|
||||
Limit=max_results, PaginationToken=next_token)
|
||||
result_2["Users"].should.have.length_of(max_results)
|
||||
result_2.shouldnt.have.key("PaginationToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_list_users_when_limit_more_than_total_items():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||
|
||||
# Given 10 users
|
||||
user_count = 10
|
||||
for i in range(user_count):
|
||||
conn.admin_create_user(UserPoolId=user_pool_id,
|
||||
Username=str(uuid.uuid4()))
|
||||
|
||||
max_results = user_count + 5
|
||||
result = conn.list_users(UserPoolId=user_pool_id, Limit=max_results)
|
||||
result["Users"].should.have.length_of(user_count)
|
||||
result.shouldnt.have.key("PaginationToken")
|
||||
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_admin_disable_user():
|
||||
conn = boto3.client("cognito-idp", "us-west-2")
|
||||
|
Loading…
Reference in New Issue
Block a user