From bca50472021cb97c8a7d6063f68cc4dc86161119 Mon Sep 17 00:00:00 2001 From: Brian Pandola Date: Fri, 11 Aug 2017 17:57:06 -0700 Subject: [PATCH] Implement additional IAM endpoints - attach_user_policy - detach_user_policy - list_attached_user_policies --- moto/iam/models.py | 37 ++++++++++++++++++++++++ moto/iam/responses.py | 59 ++++++++++++++++++++++++++++++++++++++ tests/test_iam/test_iam.py | 27 +++++++++++++++++ 3 files changed, 123 insertions(+) diff --git a/moto/iam/models.py b/moto/iam/models.py index e30ad09d4..e6f8bae63 100644 --- a/moto/iam/models.py +++ b/moto/iam/models.py @@ -80,6 +80,14 @@ class ManagedPolicy(Policy): self.attachment_count -= 1 del role.managed_policies[self.name] + def attach_to_user(self, user): + self.attachment_count += 1 + user.managed_policies[self.name] = self + + def detach_from_user(self, user): + self.attachment_count -= 1 + del user.managed_policies[self.name] + class AWSManagedPolicy(ManagedPolicy): """AWS-managed policy.""" @@ -265,6 +273,7 @@ class User(BaseModel): self.created = datetime.utcnow() self.mfa_devices = {} self.policies = {} + self.managed_policies = {} self.access_keys = [] self.password = None self.password_reset_required = False @@ -516,6 +525,16 @@ class IAMBackend(BaseBackend): except KeyError: raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn)) + def attach_user_policy(self, policy_arn, user_name): + arns = dict((p.arn, p) for p in self.managed_policies.values()) + policy = arns[policy_arn] + policy.attach_to_user(self.get_user(user_name)) + + def detach_user_policy(self, policy_arn, user_name): + arns = dict((p.arn, p) for p in self.managed_policies.values()) + policy = arns[policy_arn] + policy.detach_from_user(self.get_user(user_name)) + def create_policy(self, description, path, policy_document, policy_name): policy = ManagedPolicy( policy_name, @@ -547,6 +566,24 @@ class IAMBackend(BaseBackend): return policies, marker + def list_attached_user_policies(self, user_name, marker=None, max_items=100, path_prefix='/'): + policies = self.get_user(user_name).managed_policies.values() + + if path_prefix: + policies = [p for p in policies if p.path.startswith(path_prefix)] + + policies = sorted(policies, key=lambda policy: policy.name) + start_idx = int(marker) if marker else 0 + + policies = policies[start_idx:start_idx + max_items] + + if len(policies) < max_items: + marker = None + else: + marker = str(start_idx + max_items) + + return policies, marker + def list_policies(self, marker, max_items, only_attached, path_prefix, scope): policies = self.managed_policies.values() diff --git a/moto/iam/responses.py b/moto/iam/responses.py index 5929a2005..e79d8bc80 100644 --- a/moto/iam/responses.py +++ b/moto/iam/responses.py @@ -20,6 +20,20 @@ class IamResponse(BaseResponse): template = self.response_template(GENERIC_EMPTY_TEMPLATE) return template.render(name="DetachRolePolicyResponse") + def attach_user_policy(self): + policy_arn = self._get_param('PolicyArn') + user_name = self._get_param('UserName') + iam_backend.attach_user_policy(policy_arn, user_name) + template = self.response_template(ATTACH_USER_POLICY_TEMPLATE) + return template.render() + + def detach_user_policy(self): + policy_arn = self._get_param('PolicyArn') + user_name = self._get_param('UserName') + iam_backend.detach_user_policy(policy_arn, user_name) + template = self.response_template(DETACH_USER_POLICY_TEMPLATE) + return template.render() + def create_policy(self): description = self._get_param('Description') path = self._get_param('Path') @@ -40,6 +54,17 @@ class IamResponse(BaseResponse): template = self.response_template(LIST_ATTACHED_ROLE_POLICIES_TEMPLATE) return template.render(policies=policies, marker=marker) + def list_attached_user_policies(self): + marker = self._get_param('Marker') + max_items = self._get_int_param('MaxItems', 100) + path_prefix = self._get_param('PathPrefix', '/') + user_name = self._get_param('UserName') + policies, marker = iam_backend.list_attached_user_policies( + user_name, marker=marker, max_items=max_items, + path_prefix=path_prefix) + template = self.response_template(LIST_ATTACHED_USER_POLICIES_TEMPLATE) + return template.render(policies=policies, marker=marker) + def list_policies(self): marker = self._get_param('Marker') max_items = self._get_int_param('MaxItems', 100) @@ -466,6 +491,18 @@ DETACH_ROLE_POLICY_TEMPLATE = """ """ +ATTACH_USER_POLICY_TEMPLATE = """ + + 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE + +""" + +DETACH_USER_POLICY_TEMPLATE = """ + + 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE + +""" + CREATE_POLICY_TEMPLATE = """ @@ -506,6 +543,28 @@ LIST_ATTACHED_ROLE_POLICIES_TEMPLATE = """ """ +LIST_ATTACHED_USER_POLICIES_TEMPLATE = """ + + {% if marker is none %} + false + {% else %} + true + {{ marker }} + {% endif %} + + {% for policy in policies %} + + {{ policy.name }} + {{ policy.arn }} + + {% endfor %} + + + + 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE + +""" + LIST_POLICIES_TEMPLATE = """ {% if marker is none %} diff --git a/tests/test_iam/test_iam.py b/tests/test_iam/test_iam.py index 335b458ea..3c567136c 100644 --- a/tests/test_iam/test_iam.py +++ b/tests/test_iam/test_iam.py @@ -599,3 +599,30 @@ def test_boto3_create_login_profile(): with assert_raises(ClientError): conn.create_login_profile(UserName='my-user', Password='Password') + + +@mock_iam() +def test_attach_detach_user_policy(): + iam = boto3.resource('iam', region_name='us-east-1') + client = boto3.client('iam', region_name='us-east-1') + + user = iam.create_user(UserName='test-user') + + policy_name = 'UserAttachedPolicy' + policy = iam.create_policy(PolicyName=policy_name, + PolicyDocument='{"mypolicy": "test"}', + Path='/mypolicy/', + Description='my user attached policy') + + client.attach_user_policy(UserName=user.name, PolicyArn=policy.arn) + + resp = client.list_attached_user_policies(UserName=user.name) + resp['AttachedPolicies'].should.have.length_of(1) + attached_policy = resp['AttachedPolicies'][0] + attached_policy['PolicyArn'].should.equal(policy.arn) + attached_policy['PolicyName'].should.equal(policy_name) + + client.detach_user_policy(UserName=user.name, PolicyArn=policy.arn) + + resp = client.list_attached_user_policies(UserName=user.name) + resp['AttachedPolicies'].should.have.length_of(0)