Implement additional IAM endpoints
- attach_user_policy - detach_user_policy - list_attached_user_policies
This commit is contained in:
parent
d22671a756
commit
bca5047202
@ -80,6 +80,14 @@ class ManagedPolicy(Policy):
|
||||
self.attachment_count -= 1
|
||||
del role.managed_policies[self.name]
|
||||
|
||||
def attach_to_user(self, user):
|
||||
self.attachment_count += 1
|
||||
user.managed_policies[self.name] = self
|
||||
|
||||
def detach_from_user(self, user):
|
||||
self.attachment_count -= 1
|
||||
del user.managed_policies[self.name]
|
||||
|
||||
|
||||
class AWSManagedPolicy(ManagedPolicy):
|
||||
"""AWS-managed policy."""
|
||||
@ -265,6 +273,7 @@ class User(BaseModel):
|
||||
self.created = datetime.utcnow()
|
||||
self.mfa_devices = {}
|
||||
self.policies = {}
|
||||
self.managed_policies = {}
|
||||
self.access_keys = []
|
||||
self.password = None
|
||||
self.password_reset_required = False
|
||||
@ -516,6 +525,16 @@ class IAMBackend(BaseBackend):
|
||||
except KeyError:
|
||||
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
|
||||
|
||||
def attach_user_policy(self, policy_arn, user_name):
|
||||
arns = dict((p.arn, p) for p in self.managed_policies.values())
|
||||
policy = arns[policy_arn]
|
||||
policy.attach_to_user(self.get_user(user_name))
|
||||
|
||||
def detach_user_policy(self, policy_arn, user_name):
|
||||
arns = dict((p.arn, p) for p in self.managed_policies.values())
|
||||
policy = arns[policy_arn]
|
||||
policy.detach_from_user(self.get_user(user_name))
|
||||
|
||||
def create_policy(self, description, path, policy_document, policy_name):
|
||||
policy = ManagedPolicy(
|
||||
policy_name,
|
||||
@ -547,6 +566,24 @@ class IAMBackend(BaseBackend):
|
||||
|
||||
return policies, marker
|
||||
|
||||
def list_attached_user_policies(self, user_name, marker=None, max_items=100, path_prefix='/'):
|
||||
policies = self.get_user(user_name).managed_policies.values()
|
||||
|
||||
if path_prefix:
|
||||
policies = [p for p in policies if p.path.startswith(path_prefix)]
|
||||
|
||||
policies = sorted(policies, key=lambda policy: policy.name)
|
||||
start_idx = int(marker) if marker else 0
|
||||
|
||||
policies = policies[start_idx:start_idx + max_items]
|
||||
|
||||
if len(policies) < max_items:
|
||||
marker = None
|
||||
else:
|
||||
marker = str(start_idx + max_items)
|
||||
|
||||
return policies, marker
|
||||
|
||||
def list_policies(self, marker, max_items, only_attached, path_prefix, scope):
|
||||
policies = self.managed_policies.values()
|
||||
|
||||
|
@ -20,6 +20,20 @@ class IamResponse(BaseResponse):
|
||||
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name="DetachRolePolicyResponse")
|
||||
|
||||
def attach_user_policy(self):
|
||||
policy_arn = self._get_param('PolicyArn')
|
||||
user_name = self._get_param('UserName')
|
||||
iam_backend.attach_user_policy(policy_arn, user_name)
|
||||
template = self.response_template(ATTACH_USER_POLICY_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def detach_user_policy(self):
|
||||
policy_arn = self._get_param('PolicyArn')
|
||||
user_name = self._get_param('UserName')
|
||||
iam_backend.detach_user_policy(policy_arn, user_name)
|
||||
template = self.response_template(DETACH_USER_POLICY_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def create_policy(self):
|
||||
description = self._get_param('Description')
|
||||
path = self._get_param('Path')
|
||||
@ -40,6 +54,17 @@ class IamResponse(BaseResponse):
|
||||
template = self.response_template(LIST_ATTACHED_ROLE_POLICIES_TEMPLATE)
|
||||
return template.render(policies=policies, marker=marker)
|
||||
|
||||
def list_attached_user_policies(self):
|
||||
marker = self._get_param('Marker')
|
||||
max_items = self._get_int_param('MaxItems', 100)
|
||||
path_prefix = self._get_param('PathPrefix', '/')
|
||||
user_name = self._get_param('UserName')
|
||||
policies, marker = iam_backend.list_attached_user_policies(
|
||||
user_name, marker=marker, max_items=max_items,
|
||||
path_prefix=path_prefix)
|
||||
template = self.response_template(LIST_ATTACHED_USER_POLICIES_TEMPLATE)
|
||||
return template.render(policies=policies, marker=marker)
|
||||
|
||||
def list_policies(self):
|
||||
marker = self._get_param('Marker')
|
||||
max_items = self._get_int_param('MaxItems', 100)
|
||||
@ -466,6 +491,18 @@ DETACH_ROLE_POLICY_TEMPLATE = """<DetachRolePolicyResponse>
|
||||
</ResponseMetadata>
|
||||
</DetachRolePolicyResponse>"""
|
||||
|
||||
ATTACH_USER_POLICY_TEMPLATE = """<AttachUserPolicyResponse>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</AttachUserPolicyResponse>"""
|
||||
|
||||
DETACH_USER_POLICY_TEMPLATE = """<DetachUserPolicyResponse>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</DetachUserPolicyResponse>"""
|
||||
|
||||
CREATE_POLICY_TEMPLATE = """<CreatePolicyResponse>
|
||||
<CreatePolicyResult>
|
||||
<Policy>
|
||||
@ -506,6 +543,28 @@ LIST_ATTACHED_ROLE_POLICIES_TEMPLATE = """<ListAttachedRolePoliciesResponse>
|
||||
</ResponseMetadata>
|
||||
</ListAttachedRolePoliciesResponse>"""
|
||||
|
||||
LIST_ATTACHED_USER_POLICIES_TEMPLATE = """<ListAttachedUserPoliciesResponse>
|
||||
<ListAttachedUserPoliciesResult>
|
||||
{% if marker is none %}
|
||||
<IsTruncated>false</IsTruncated>
|
||||
{% else %}
|
||||
<IsTruncated>true</IsTruncated>
|
||||
<Marker>{{ marker }}</Marker>
|
||||
{% endif %}
|
||||
<AttachedPolicies>
|
||||
{% for policy in policies %}
|
||||
<member>
|
||||
<PolicyName>{{ policy.name }}</PolicyName>
|
||||
<PolicyArn>{{ policy.arn }}</PolicyArn>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</AttachedPolicies>
|
||||
</ListAttachedUserPoliciesResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</ListAttachedUserPoliciesResponse>"""
|
||||
|
||||
LIST_POLICIES_TEMPLATE = """<ListPoliciesResponse>
|
||||
<ListPoliciesResult>
|
||||
{% if marker is none %}
|
||||
|
@ -599,3 +599,30 @@ def test_boto3_create_login_profile():
|
||||
|
||||
with assert_raises(ClientError):
|
||||
conn.create_login_profile(UserName='my-user', Password='Password')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_attach_detach_user_policy():
|
||||
iam = boto3.resource('iam', region_name='us-east-1')
|
||||
client = boto3.client('iam', region_name='us-east-1')
|
||||
|
||||
user = iam.create_user(UserName='test-user')
|
||||
|
||||
policy_name = 'UserAttachedPolicy'
|
||||
policy = iam.create_policy(PolicyName=policy_name,
|
||||
PolicyDocument='{"mypolicy": "test"}',
|
||||
Path='/mypolicy/',
|
||||
Description='my user attached policy')
|
||||
|
||||
client.attach_user_policy(UserName=user.name, PolicyArn=policy.arn)
|
||||
|
||||
resp = client.list_attached_user_policies(UserName=user.name)
|
||||
resp['AttachedPolicies'].should.have.length_of(1)
|
||||
attached_policy = resp['AttachedPolicies'][0]
|
||||
attached_policy['PolicyArn'].should.equal(policy.arn)
|
||||
attached_policy['PolicyName'].should.equal(policy_name)
|
||||
|
||||
client.detach_user_policy(UserName=user.name, PolicyArn=policy.arn)
|
||||
|
||||
resp = client.list_attached_user_policies(UserName=user.name)
|
||||
resp['AttachedPolicies'].should.have.length_of(0)
|
||||
|
Loading…
Reference in New Issue
Block a user