Merge pull request #1730 from garyd203/cognitoidp-groups

Implement groups for cognito-idp
This commit is contained in:
Steve Pulec 2018-12-28 19:53:53 -05:00 committed by GitHub
commit a62f15b537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 474 additions and 37 deletions

View File

@ -827,25 +827,25 @@
- [ ] unlink_identity
- [ ] update_identity_pool
## cognito-idp - 0% implemented
## cognito-idp - 34% implemented
- [ ] add_custom_attributes
- [ ] admin_add_user_to_group
- [X] admin_add_user_to_group
- [ ] admin_confirm_sign_up
- [ ] admin_create_user
- [ ] admin_delete_user
- [X] admin_create_user
- [X] admin_delete_user
- [ ] admin_delete_user_attributes
- [ ] admin_disable_provider_for_user
- [X] admin_disable_user
- [X] admin_enable_user
- [ ] admin_forget_device
- [ ] admin_get_device
- [ ] admin_get_user
- [ ] admin_initiate_auth
- [X] admin_get_user
- [X] admin_initiate_auth
- [ ] admin_link_provider_for_user
- [ ] admin_list_devices
- [ ] admin_list_groups_for_user
- [X] admin_list_groups_for_user
- [ ] admin_list_user_auth_events
- [ ] admin_remove_user_from_group
- [X] admin_remove_user_from_group
- [ ] admin_reset_user_password
- [ ] admin_respond_to_auth_challenge
- [ ] admin_set_user_mfa_preference
@ -855,37 +855,37 @@
- [ ] admin_update_user_attributes
- [ ] admin_user_global_sign_out
- [ ] associate_software_token
- [ ] change_password
- [X] change_password
- [ ] confirm_device
- [ ] confirm_forgot_password
- [X] confirm_forgot_password
- [ ] confirm_sign_up
- [ ] create_group
- [ ] create_identity_provider
- [X] create_group
- [X] create_identity_provider
- [ ] create_resource_server
- [ ] create_user_import_job
- [ ] create_user_pool
- [ ] create_user_pool_client
- [ ] create_user_pool_domain
- [ ] delete_group
- [ ] delete_identity_provider
- [X] create_user_pool
- [X] create_user_pool_client
- [X] create_user_pool_domain
- [X] delete_group
- [X] delete_identity_provider
- [ ] delete_resource_server
- [ ] delete_user
- [ ] delete_user_attributes
- [ ] delete_user_pool
- [ ] delete_user_pool_client
- [ ] delete_user_pool_domain
- [ ] describe_identity_provider
- [X] delete_user_pool
- [X] delete_user_pool_client
- [X] delete_user_pool_domain
- [X] describe_identity_provider
- [ ] describe_resource_server
- [ ] describe_risk_configuration
- [ ] describe_user_import_job
- [ ] describe_user_pool
- [ ] describe_user_pool_client
- [ ] describe_user_pool_domain
- [X] describe_user_pool
- [X] describe_user_pool_client
- [X] describe_user_pool_domain
- [ ] forget_device
- [ ] forgot_password
- [ ] get_csv_header
- [ ] get_device
- [ ] get_group
- [X] get_group
- [ ] get_identity_provider_by_identifier
- [ ] get_signing_certificate
- [ ] get_ui_customization
@ -895,16 +895,16 @@
- [ ] global_sign_out
- [ ] initiate_auth
- [ ] list_devices
- [ ] list_groups
- [ ] list_identity_providers
- [X] list_groups
- [X] list_identity_providers
- [ ] list_resource_servers
- [ ] list_user_import_jobs
- [ ] list_user_pool_clients
- [ ] list_user_pools
- [ ] list_users
- [ ] list_users_in_group
- [X] list_user_pool_clients
- [X] list_user_pools
- [X] list_users
- [X] list_users_in_group
- [ ] resend_confirmation_code
- [ ] respond_to_auth_challenge
- [X] respond_to_auth_challenge
- [ ] set_risk_configuration
- [ ] set_ui_customization
- [ ] set_user_mfa_preference
@ -920,7 +920,7 @@
- [ ] update_resource_server
- [ ] update_user_attributes
- [ ] update_user_pool
- [ ] update_user_pool_client
- [X] update_user_pool_client
- [ ] verify_software_token
- [ ] verify_user_attribute

View File

@ -24,6 +24,16 @@ class UserNotFoundError(BadRequest):
})
class GroupExistsException(BadRequest):
def __init__(self, message):
super(GroupExistsException, self).__init__()
self.description = json.dumps({
"message": message,
'__type': 'GroupExistsException',
})
class NotAuthorizedError(BadRequest):
def __init__(self, message):

View File

@ -13,8 +13,7 @@ from jose import jws
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
from .exceptions import NotAuthorizedError, ResourceNotFoundError, UserNotFoundError
from .exceptions import GroupExistsException, NotAuthorizedError, ResourceNotFoundError, UserNotFoundError
UserStatus = {
"FORCE_CHANGE_PASSWORD": "FORCE_CHANGE_PASSWORD",
@ -68,6 +67,7 @@ class CognitoIdpUserPool(BaseModel):
self.clients = OrderedDict()
self.identity_providers = OrderedDict()
self.groups = OrderedDict()
self.users = OrderedDict()
self.refresh_tokens = {}
self.access_tokens = {}
@ -220,6 +220,33 @@ class CognitoIdpIdentityProvider(BaseModel):
return identity_provider_json
class CognitoIdpGroup(BaseModel):
def __init__(self, user_pool_id, group_name, description, role_arn, precedence):
self.user_pool_id = user_pool_id
self.group_name = group_name
self.description = description or ""
self.role_arn = role_arn
self.precedence = precedence
self.last_modified_date = datetime.datetime.now()
self.creation_date = self.last_modified_date
# Users who are members of this group.
# Note that these links are bidirectional.
self.users = set()
def to_json(self):
return {
"GroupName": self.group_name,
"UserPoolId": self.user_pool_id,
"Description": self.description,
"RoleArn": self.role_arn,
"Precedence": self.precedence,
"LastModifiedDate": time.mktime(self.last_modified_date.timetuple()),
"CreationDate": time.mktime(self.creation_date.timetuple()),
}
class CognitoIdpUser(BaseModel):
def __init__(self, user_pool_id, username, password, status, attributes):
@ -233,6 +260,10 @@ class CognitoIdpUser(BaseModel):
self.create_date = datetime.datetime.utcnow()
self.last_modified_date = datetime.datetime.utcnow()
# Groups this user is a member of.
# Note that these links are bidirectional.
self.groups = set()
def _base_json(self):
return {
"UserPoolId": self.user_pool_id,
@ -405,6 +436,72 @@ class CognitoIdpBackend(BaseBackend):
del user_pool.identity_providers[name]
# Group
def create_group(self, user_pool_id, group_name, description, role_arn, precedence):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
group = CognitoIdpGroup(user_pool_id, group_name, description, role_arn, precedence)
if group.group_name in user_pool.groups:
raise GroupExistsException("A group with the name already exists")
user_pool.groups[group.group_name] = group
return group
def get_group(self, user_pool_id, group_name):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if group_name not in user_pool.groups:
raise ResourceNotFoundError(group_name)
return user_pool.groups[group_name]
def list_groups(self, user_pool_id):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
return user_pool.groups.values()
def delete_group(self, user_pool_id, group_name):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if group_name not in user_pool.groups:
raise ResourceNotFoundError(group_name)
group = user_pool.groups[group_name]
for user in group.users:
user.groups.remove(group)
del user_pool.groups[group_name]
def admin_add_user_to_group(self, user_pool_id, group_name, username):
group = self.get_group(user_pool_id, group_name)
user = self.admin_get_user(user_pool_id, username)
group.users.add(user)
user.groups.add(group)
def list_users_in_group(self, user_pool_id, group_name):
group = self.get_group(user_pool_id, group_name)
return list(group.users)
def admin_list_groups_for_user(self, user_pool_id, username):
user = self.admin_get_user(user_pool_id, username)
return list(user.groups)
def admin_remove_user_from_group(self, user_pool_id, group_name, username):
group = self.get_group(user_pool_id, group_name)
user = self.admin_get_user(user_pool_id, username)
group.users.discard(user)
user.groups.discard(group)
# User
def admin_create_user(self, user_pool_id, username, temporary_password, attributes):
user_pool = self.user_pools.get(user_pool_id)
@ -449,6 +546,10 @@ class CognitoIdpBackend(BaseBackend):
if username not in user_pool.users:
raise UserNotFoundError(username)
user = user_pool.users[username]
for group in user.groups:
group.users.remove(user)
del user_pool.users[username]
def _log_user_in(self, user_pool, client, username):

View File

@ -149,6 +149,89 @@ class CognitoIdpResponse(BaseResponse):
cognitoidp_backends[self.region].delete_identity_provider(user_pool_id, name)
return ""
# Group
def create_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
description = self._get_param("Description")
role_arn = self._get_param("RoleArn")
precedence = self._get_param("Precedence")
group = cognitoidp_backends[self.region].create_group(
user_pool_id,
group_name,
description,
role_arn,
precedence,
)
return json.dumps({
"Group": group.to_json(),
})
def get_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
group = cognitoidp_backends[self.region].get_group(user_pool_id, group_name)
return json.dumps({
"Group": group.to_json(),
})
def list_groups(self):
user_pool_id = self._get_param("UserPoolId")
groups = cognitoidp_backends[self.region].list_groups(user_pool_id)
return json.dumps({
"Groups": [group.to_json() for group in groups],
})
def delete_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].delete_group(user_pool_id, group_name)
return ""
def admin_add_user_to_group(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
group_name = self._get_param("GroupName")
cognitoidp_backends[self.region].admin_add_user_to_group(
user_pool_id,
group_name,
username,
)
return ""
def list_users_in_group(self):
user_pool_id = self._get_param("UserPoolId")
group_name = self._get_param("GroupName")
users = cognitoidp_backends[self.region].list_users_in_group(user_pool_id, group_name)
return json.dumps({
"Users": [user.to_json(extended=True) for user in users],
})
def admin_list_groups_for_user(self):
username = self._get_param("Username")
user_pool_id = self._get_param("UserPoolId")
groups = cognitoidp_backends[self.region].admin_list_groups_for_user(user_pool_id, username)
return json.dumps({
"Groups": [group.to_json() for group in groups],
})
def admin_remove_user_from_group(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
group_name = self._get_param("GroupName")
cognitoidp_backends[self.region].admin_remove_user_from_group(
user_pool_id,
group_name,
username,
)
return ""
# User
def admin_create_user(self):
user_pool_id = self._get_param("UserPoolId")

View File

@ -1,14 +1,18 @@
from __future__ import unicode_literals
import boto3
import json
import os
import random
import uuid
import boto3
# noinspection PyUnresolvedReferences
import sure # noqa
from botocore.exceptions import ClientError
from jose import jws
from nose.tools import assert_raises
from moto import mock_cognitoidp
import sure # noqa
@mock_cognitoidp
@ -511,6 +515,245 @@ def test_delete_identity_providers():
caught.should.be.true
@mock_cognitoidp
def test_create_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
description = str(uuid.uuid4())
role_arn = "arn:aws:iam:::role/my-iam-role"
precedence = random.randint(0, 100000)
result = conn.create_group(
GroupName=group_name,
UserPoolId=user_pool_id,
Description=description,
RoleArn=role_arn,
Precedence=precedence,
)
result["Group"]["GroupName"].should.equal(group_name)
result["Group"]["UserPoolId"].should.equal(user_pool_id)
result["Group"]["Description"].should.equal(description)
result["Group"]["RoleArn"].should.equal(role_arn)
result["Group"]["Precedence"].should.equal(precedence)
result["Group"]["LastModifiedDate"].should.be.a("datetime.datetime")
result["Group"]["CreationDate"].should.be.a("datetime.datetime")
@mock_cognitoidp
def test_create_group_with_duplicate_name_raises_error():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
with assert_raises(ClientError) as cm:
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
cm.exception.operation_name.should.equal('CreateGroup')
cm.exception.response['Error']['Code'].should.equal('GroupExistsException')
cm.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
@mock_cognitoidp
def test_get_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
result = conn.get_group(GroupName=group_name, UserPoolId=user_pool_id)
result["Group"]["GroupName"].should.equal(group_name)
result["Group"]["UserPoolId"].should.equal(user_pool_id)
result["Group"]["LastModifiedDate"].should.be.a("datetime.datetime")
result["Group"]["CreationDate"].should.be.a("datetime.datetime")
@mock_cognitoidp
def test_list_groups():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
result = conn.list_groups(UserPoolId=user_pool_id)
result["Groups"].should.have.length_of(1)
result["Groups"][0]["GroupName"].should.equal(group_name)
@mock_cognitoidp
def test_delete_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
result = conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id)
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
with assert_raises(ClientError) as cm:
conn.get_group(GroupName=group_name, UserPoolId=user_pool_id)
cm.exception.response['Error']['Code'].should.equal('ResourceNotFoundException')
@mock_cognitoidp
def test_admin_add_user_to_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
result = conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
@mock_cognitoidp
def test_admin_add_user_to_group_again_is_noop():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
@mock_cognitoidp
def test_list_users_in_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
result = conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name)
result["Users"].should.have.length_of(1)
result["Users"][0]["Username"].should.equal(username)
@mock_cognitoidp
def test_list_users_in_group_ignores_deleted_user():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
username2 = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username2)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username2, GroupName=group_name)
conn.admin_delete_user(UserPoolId=user_pool_id, Username=username)
result = conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name)
result["Users"].should.have.length_of(1)
result["Users"][0]["Username"].should.equal(username2)
@mock_cognitoidp
def test_admin_list_groups_for_user():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id)
result["Groups"].should.have.length_of(1)
result["Groups"][0]["GroupName"].should.equal(group_name)
@mock_cognitoidp
def test_admin_list_groups_for_user_ignores_deleted_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
group_name2 = str(uuid.uuid4())
conn.create_group(GroupName=group_name2, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name2)
conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id)
result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id)
result["Groups"].should.have.length_of(1)
result["Groups"][0]["GroupName"].should.equal(group_name2)
@mock_cognitoidp
def test_admin_remove_user_from_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
result = conn.admin_remove_user_from_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected
conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name) \
["Users"].should.have.length_of(0)
conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id) \
["Groups"].should.have.length_of(0)
@mock_cognitoidp
def test_admin_remove_user_from_group_again_is_noop():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
username = str(uuid.uuid4())
conn.admin_create_user(UserPoolId=user_pool_id, Username=username)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name)
@mock_cognitoidp
def test_admin_create_user():
conn = boto3.client("cognito-idp", "us-west-2")