Implement group management for cognito-idp

This commit is contained in:
Gary Donovan 2018-07-23 16:26:54 +10:00
parent 24e942b50e
commit 04fdd5617a
5 changed files with 214 additions and 8 deletions

View File

@ -859,14 +859,14 @@
- [ ] confirm_device
- [X] confirm_forgot_password
- [ ] confirm_sign_up
- [ ] create_group
- [X] create_group
- [X] create_identity_provider
- [ ] create_resource_server
- [ ] create_user_import_job
- [X] create_user_pool
- [X] create_user_pool_client
- [X] create_user_pool_domain
- [ ] delete_group
- [X] delete_group
- [X] delete_identity_provider
- [ ] delete_resource_server
- [ ] delete_user
@ -885,7 +885,7 @@
- [ ] forgot_password
- [ ] get_csv_header
- [ ] get_device
- [ ] get_group
- [X] get_group
- [ ] get_identity_provider_by_identifier
- [ ] get_signing_certificate
- [ ] get_ui_customization
@ -895,7 +895,7 @@
- [ ] global_sign_out
- [ ] initiate_auth
- [ ] list_devices
- [ ] list_groups
- [X] list_groups
- [X] list_identity_providers
- [ ] list_resource_servers
- [ ] list_user_import_jobs

View File

@ -24,6 +24,16 @@ class UserNotFoundError(BadRequest):
})
class GroupExistsException(BadRequest):
def __init__(self, message):
super(GroupExistsException, self).__init__()
self.description = json.dumps({
"message": message,
'__type': 'GroupExistsException',
})
class NotAuthorizedError(BadRequest):
def __init__(self, message):

View File

@ -11,8 +11,7 @@ from jose import jws
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
from .exceptions import NotAuthorizedError, ResourceNotFoundError, UserNotFoundError
from .exceptions import GroupExistsException, NotAuthorizedError, ResourceNotFoundError, UserNotFoundError
UserStatus = {
"FORCE_CHANGE_PASSWORD": "FORCE_CHANGE_PASSWORD",
@ -33,6 +32,7 @@ class CognitoIdpUserPool(BaseModel):
self.clients = OrderedDict()
self.identity_providers = OrderedDict()
self.groups = OrderedDict()
self.users = OrderedDict()
self.refresh_tokens = {}
self.access_tokens = {}
@ -185,6 +185,29 @@ class CognitoIdpIdentityProvider(BaseModel):
return identity_provider_json
class CognitoIdpGroup(BaseModel):
def __init__(self, user_pool_id, group_name, description, role_arn, precedence):
self.user_pool_id = user_pool_id
self.group_name = group_name
self.description = description or ""
self.role_arn = role_arn
self.precedence = precedence
self.last_modified_date = datetime.datetime.now()
self.creation_date = self.last_modified_date
def to_json(self):
return {
"GroupName": self.group_name,
"UserPoolId": self.user_pool_id,
"Description": self.description,
"RoleArn": self.role_arn,
"Precedence": self.precedence,
"LastModifiedDate": time.mktime(self.last_modified_date.timetuple()),
"CreationDate": time.mktime(self.creation_date.timetuple()),
}
class CognitoIdpUser(BaseModel):
def __init__(self, user_pool_id, username, password, status, attributes):
@ -367,6 +390,46 @@ class CognitoIdpBackend(BaseBackend):
del user_pool.identity_providers[name]
# Group
def create_group(self, user_pool_id, group_name, description, role_arn, precedence):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
group = CognitoIdpGroup(user_pool_id, group_name, description, role_arn, precedence)
if group.group_name in user_pool.groups:
raise GroupExistsException("A group with the name already exists")
user_pool.groups[group.group_name] = group
return group
def get_group(self, user_pool_id, group_name):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if group_name not in user_pool.groups:
raise ResourceNotFoundError(group_name)
return user_pool.groups[group_name]
def list_groups(self, user_pool_id):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
return user_pool.groups.values()
def delete_group(self, user_pool_id, group_name):
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(user_pool_id)
if group_name not in user_pool.groups:
raise ResourceNotFoundError(group_name)
del user_pool.groups[group_name]
# User
def admin_create_user(self, user_pool_id, username, temporary_password, attributes):
user_pool = self.user_pools.get(user_pool_id)

View File

@ -129,6 +129,47 @@ class CognitoIdpResponse(BaseResponse):
cognitoidp_backends[self.region].delete_identity_provider(user_pool_id, name)
return ""
# Group
def create_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
description = self._get_param("Description")
role_arn = self._get_param("RoleArn")
precedence = self._get_param("Precedence")
group = cognitoidp_backends[self.region].create_group(
user_pool_id,
group_name,
description,
role_arn,
precedence,
)
return json.dumps({
"Group": group.to_json(),
})
def get_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
group = cognitoidp_backends[self.region].get_group(user_pool_id, group_name)
return json.dumps({
"Group": group.to_json(),
})
def list_groups(self):
user_pool_id = self._get_param("UserPoolId")
groups = cognitoidp_backends[self.region].list_groups(user_pool_id)
return json.dumps({
"Groups": [group.to_json() for group in groups],
})
def delete_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].delete_group(user_pool_id, group_name)
return ""
# User
def admin_create_user(self):
user_pool_id = self._get_param("UserPoolId")

View File

@ -1,14 +1,18 @@
from __future__ import unicode_literals
import boto3
import json
import os
import random
import uuid
import boto3
# noinspection PyUnresolvedReferences
import sure # noqa
from botocore.exceptions import ClientError
from jose import jws
from nose.tools import assert_raises
from moto import mock_cognitoidp
import sure # noqa
@mock_cognitoidp
@ -323,6 +327,94 @@ def test_delete_identity_providers():
caught.should.be.true
@mock_cognitoidp
def test_create_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
description = str(uuid.uuid4())
role_arn = "arn:aws:iam:::role/my-iam-role"
precedence = random.randint(0, 100000)
result = conn.create_group(
GroupName=group_name,
UserPoolId=user_pool_id,
Description=description,
RoleArn=role_arn,
Precedence=precedence,
)
result["Group"]["GroupName"].should.equal(group_name)
result["Group"]["UserPoolId"].should.equal(user_pool_id)
result["Group"]["Description"].should.equal(description)
result["Group"]["RoleArn"].should.equal(role_arn)
result["Group"]["Precedence"].should.equal(precedence)
result["Group"]["LastModifiedDate"].should.be.a("datetime.datetime")
result["Group"]["CreationDate"].should.be.a("datetime.datetime")
@mock_cognitoidp
def test_create_group_with_duplicate_name_raises_error():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
with assert_raises(ClientError) as cm:
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
cm.exception.operation_name.should.equal('CreateGroup')
cm.exception.response['Error']['Code'].should.equal('GroupExistsException')
cm.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
@mock_cognitoidp
def test_get_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
result = conn.get_group(GroupName=group_name, UserPoolId=user_pool_id)
result["Group"]["GroupName"].should.equal(group_name)
result["Group"]["UserPoolId"].should.equal(user_pool_id)
result["Group"]["LastModifiedDate"].should.be.a("datetime.datetime")
result["Group"]["CreationDate"].should.be.a("datetime.datetime")
@mock_cognitoidp
def test_list_groups():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
result = conn.list_groups(UserPoolId=user_pool_id)
result["Groups"].should.have.length_of(1)
result["Groups"][0]["GroupName"].should.equal(group_name)
@mock_cognitoidp
def test_delete_group():
conn = boto3.client("cognito-idp", "us-west-2")
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
group_name = str(uuid.uuid4())
conn.create_group(GroupName=group_name, UserPoolId=user_pool_id)
conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id)
with assert_raises(ClientError) as cm:
conn.get_group(GroupName=group_name, UserPoolId=user_pool_id)
cm.exception.response['Error']['Code'].should.equal('ResourceNotFoundException')
@mock_cognitoidp
def test_admin_create_user():
conn = boto3.client("cognito-idp", "us-west-2")