From ffa7560d026b9d0a3380b3c7cb3006d9e89af76a Mon Sep 17 00:00:00 2001 From: Gary Donovan Date: Mon, 23 Jul 2018 20:54:51 +1000 Subject: [PATCH] Implement user-group relationships for cognito-idp --- IMPLEMENTATION_COVERAGE.md | 10 +- moto/cognitoidp/models.py | 38 ++++++ moto/cognitoidp/responses.py | 42 +++++++ tests/test_cognitoidp/test_cognitoidp.py | 153 ++++++++++++++++++++++- 4 files changed, 237 insertions(+), 6 deletions(-) diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index cceaa3c8a..47420e8a4 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -827,9 +827,9 @@ - [ ] unlink_identity - [ ] update_identity_pool -## cognito-idp - 0% implemented +## cognito-idp - 34% implemented - [ ] add_custom_attributes -- [ ] admin_add_user_to_group +- [X] admin_add_user_to_group - [ ] admin_confirm_sign_up - [X] admin_create_user - [X] admin_delete_user @@ -843,9 +843,9 @@ - [X] admin_initiate_auth - [ ] admin_link_provider_for_user - [ ] admin_list_devices -- [ ] admin_list_groups_for_user +- [X] admin_list_groups_for_user - [ ] admin_list_user_auth_events -- [ ] admin_remove_user_from_group +- [X] admin_remove_user_from_group - [ ] admin_reset_user_password - [ ] admin_respond_to_auth_challenge - [ ] admin_set_user_mfa_preference @@ -902,7 +902,7 @@ - [X] list_user_pool_clients - [X] list_user_pools - [X] list_users -- [ ] list_users_in_group +- [X] list_users_in_group - [ ] resend_confirmation_code - [X] respond_to_auth_challenge - [ ] set_risk_configuration diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index a11eb435c..349d1e319 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -196,6 +196,10 @@ class CognitoIdpGroup(BaseModel): self.last_modified_date = datetime.datetime.now() self.creation_date = self.last_modified_date + # Users who are members of this group. + # Note that these links are bidirectional. + self.users = set() + def to_json(self): return { "GroupName": self.group_name, @@ -221,6 +225,10 @@ class CognitoIdpUser(BaseModel): self.create_date = datetime.datetime.utcnow() self.last_modified_date = datetime.datetime.utcnow() + # Groups this user is a member of. + # Note that these links are bidirectional. + self.groups = set() + def _base_json(self): return { "UserPoolId": self.user_pool_id, @@ -428,8 +436,34 @@ class CognitoIdpBackend(BaseBackend): if group_name not in user_pool.groups: raise ResourceNotFoundError(group_name) + group = user_pool.groups[group_name] + for user in group.users: + user.groups.remove(group) + del user_pool.groups[group_name] + def admin_add_user_to_group(self, user_pool_id, group_name, username): + group = self.get_group(user_pool_id, group_name) + user = self.admin_get_user(user_pool_id, username) + + group.users.add(user) + user.groups.add(group) + + def list_users_in_group(self, user_pool_id, group_name): + group = self.get_group(user_pool_id, group_name) + return list(group.users) + + def admin_list_groups_for_user(self, user_pool_id, username): + user = self.admin_get_user(user_pool_id, username) + return list(user.groups) + + def admin_remove_user_from_group(self, user_pool_id, group_name, username): + group = self.get_group(user_pool_id, group_name) + user = self.admin_get_user(user_pool_id, username) + + group.users.discard(user) + user.groups.discard(group) + # User def admin_create_user(self, user_pool_id, username, temporary_password, attributes): user_pool = self.user_pools.get(user_pool_id) @@ -465,6 +499,10 @@ class CognitoIdpBackend(BaseBackend): if username not in user_pool.users: raise ResourceNotFoundError(username) + user = user_pool.users[username] + for group in user.groups: + group.users.remove(user) + del user_pool.users[username] def _log_user_in(self, user_pool, client, username): diff --git a/moto/cognitoidp/responses.py b/moto/cognitoidp/responses.py index a2f67ba84..7d19fa3e8 100644 --- a/moto/cognitoidp/responses.py +++ b/moto/cognitoidp/responses.py @@ -170,6 +170,48 @@ class CognitoIdpResponse(BaseResponse): cognitoidp_backends[self.region].delete_group(user_pool_id, group_name) return "" + def admin_add_user_to_group(self): + user_pool_id = self._get_param("UserPoolId") + username = self._get_param("Username") + group_name = self._get_param("GroupName") + + cognitoidp_backends[self.region].admin_add_user_to_group( + user_pool_id, + group_name, + username, + ) + + return "" + + def list_users_in_group(self): + user_pool_id = self._get_param("UserPoolId") + group_name = self._get_param("GroupName") + users = cognitoidp_backends[self.region].list_users_in_group(user_pool_id, group_name) + return json.dumps({ + "Users": [user.to_json(extended=True) for user in users], + }) + + def admin_list_groups_for_user(self): + username = self._get_param("Username") + user_pool_id = self._get_param("UserPoolId") + groups = cognitoidp_backends[self.region].admin_list_groups_for_user(user_pool_id, username) + return json.dumps({ + "Groups": [group.to_json() for group in groups], + }) + + def admin_remove_user_from_group(self): + user_pool_id = self._get_param("UserPoolId") + username = self._get_param("Username") + group_name = self._get_param("GroupName") + + cognitoidp_backends[self.region].admin_remove_user_from_group( + user_pool_id, + group_name, + username, + ) + + return "" + # User def admin_create_user(self): user_pool_id = self._get_param("UserPoolId") diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index c226f91b7..07edc990c 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -408,13 +408,164 @@ def test_delete_group(): group_name = str(uuid.uuid4()) conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) - conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id) + result = conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id) + list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected with assert_raises(ClientError) as cm: conn.get_group(GroupName=group_name, UserPoolId=user_pool_id) cm.exception.response['Error']['Code'].should.equal('ResourceNotFoundException') +@mock_cognitoidp +def test_admin_add_user_to_group(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + result = conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected + + +@mock_cognitoidp +def test_admin_add_user_to_group_again_is_noop(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + + +@mock_cognitoidp +def test_list_users_in_group(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + + result = conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name) + + result["Users"].should.have.length_of(1) + result["Users"][0]["Username"].should.equal(username) + + +@mock_cognitoidp +def test_list_users_in_group_ignores_deleted_user(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + username2 = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username2) + + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username2, GroupName=group_name) + conn.admin_delete_user(UserPoolId=user_pool_id, Username=username) + + result = conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name) + + result["Users"].should.have.length_of(1) + result["Users"][0]["Username"].should.equal(username2) + + +@mock_cognitoidp +def test_admin_list_groups_for_user(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + + result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id) + + result["Groups"].should.have.length_of(1) + result["Groups"][0]["GroupName"].should.equal(group_name) + + +@mock_cognitoidp +def test_admin_list_groups_for_user_ignores_deleted_group(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + group_name2 = str(uuid.uuid4()) + conn.create_group(GroupName=group_name2, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name2) + conn.delete_group(GroupName=group_name, UserPoolId=user_pool_id) + + result = conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id) + + result["Groups"].should.have.length_of(1) + result["Groups"][0]["GroupName"].should.equal(group_name2) + + +@mock_cognitoidp +def test_admin_remove_user_from_group(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + + result = conn.admin_remove_user_from_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + list(result.keys()).should.equal(["ResponseMetadata"]) # No response expected + conn.list_users_in_group(UserPoolId=user_pool_id, GroupName=group_name) \ + ["Users"].should.have.length_of(0) + conn.admin_list_groups_for_user(Username=username, UserPoolId=user_pool_id) \ + ["Groups"].should.have.length_of(0) + + +@mock_cognitoidp +def test_admin_remove_user_from_group_again_is_noop(): + conn = boto3.client("cognito-idp", "us-west-2") + + user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] + group_name = str(uuid.uuid4()) + conn.create_group(GroupName=group_name, UserPoolId=user_pool_id) + + username = str(uuid.uuid4()) + conn.admin_create_user(UserPoolId=user_pool_id, Username=username) + + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + conn.admin_add_user_to_group(UserPoolId=user_pool_id, Username=username, GroupName=group_name) + + @mock_cognitoidp def test_admin_create_user(): conn = boto3.client("cognito-idp", "us-west-2")