iam: add group policy methods (#849)

Implemented mocks for:
* get_all_group_policies
* list_group_policies (boto3)
* get_group_policy
* put_group_policy
This commit is contained in:
Léo Cavaillé 2017-03-05 04:56:36 +01:00 committed by Steve Pulec
parent 56f9409ca9
commit 9b6d3983d2
3 changed files with 125 additions and 0 deletions

View File

@ -167,6 +167,7 @@ class Group(object):
)
self.users = []
self.policies = {}
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@ -174,6 +175,24 @@ class Group(object):
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "Arn" ]"')
raise UnformattedGetAttTemplateException()
def get_policy(self, policy_name):
try:
policy_json = self.policies[policy_name]
except KeyError:
raise IAMNotFoundException("Policy {0} not found".format(policy_name))
return {
'policy_name': policy_name,
'policy_document': policy_json,
'group_name': self.name,
}
def put_policy(self, policy_name, policy_json):
self.policies[policy_name] = policy_json
def list_policies(self):
return self.policies.keys()
class User(object):
def __init__(self, name, path=None):
@ -573,6 +592,18 @@ class IAMBackend(BaseBackend):
return groups
def put_group_policy(self, group_name, policy_name, policy_json):
group = self.get_group(group_name)
group.put_policy(policy_name, policy_json)
def list_group_policies(self, group_name, marker=None, max_items=None):
group = self.get_group(group_name)
return group.list_policies()
def get_group_policy(self, group_name, policy_name):
group = self.get_group(group_name)
return group.get_policy(policy_name)
def create_user(self, user_name, path='/'):
if user_name in self.users:
raise IAMConflictException("EntityAlreadyExists", "User {0} already exists".format(user_name))

View File

@ -186,6 +186,32 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_GROUPS_FOR_USER_TEMPLATE)
return template.render(groups=groups)
def put_group_policy(self):
group_name = self._get_param('GroupName')
policy_name = self._get_param('PolicyName')
policy_document = self._get_param('PolicyDocument')
iam_backend.put_group_policy(group_name, policy_name, policy_document)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="PutGroupPolicyResponse")
def list_group_policies(self):
group_name = self._get_param('GroupName')
marker = self._get_param('Marker')
max_items = self._get_param('MaxItems')
policies = iam_backend.list_group_policies(group_name,
marker=marker, max_items=max_items)
template = self.response_template(LIST_GROUP_POLICIES_TEMPLATE)
return template.render(name="ListGroupPoliciesResponse",
policies=policies,
marker=marker)
def get_group_policy(self):
group_name = self._get_param('GroupName')
policy_name = self._get_param('PolicyName')
policy_result = iam_backend.get_group_policy(group_name, policy_name)
template = self.response_template(GET_GROUP_POLICY_TEMPLATE)
return template.render(name="GetGroupPolicyResponse", **policy_result)
def create_user(self):
user_name = self._get_param('UserName')
path = self._get_param('Path')
@ -194,6 +220,7 @@ class IamResponse(BaseResponse):
template = self.response_template(USER_TEMPLATE)
return template.render(action='Create', user=user)
def get_user(self):
user_name = self._get_param('UserName')
user = iam_backend.get_user(user_name)
@ -699,6 +726,35 @@ LIST_GROUPS_FOR_USER_TEMPLATE = """<ListGroupsForUserResponse>
</ResponseMetadata>
</ListGroupsForUserResponse>"""
LIST_GROUP_POLICIES_TEMPLATE = """<ListGroupPoliciesResponse>
<ListGroupPoliciesResult>
{% if marker is none %}
<IsTruncated>false</IsTruncated>
{% else %}
<IsTruncated>true</IsTruncated>
<Marker>{{ marker }}</Marker>
{% endif %}
<PolicyNames>
{% for policy in policies %}
<member>{{ policy }}</member>
{% endfor %}
</PolicyNames>
</ListGroupPoliciesResult>
<ResponseMetadata>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</ListGroupPoliciesResponse>"""
GET_GROUP_POLICY_TEMPLATE = """<GetGroupPolicyResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<GetGroupPolicyResult>
<PolicyName>{{ policy_name }}</PolicyName>
<GroupName>{{ group_name }}</GroupName>
<PolicyDocument>{{ policy_document }}</PolicyDocument>
</GetGroupPolicyResult>
<ResponseMetadata>
<RequestId>7e7cd8bc-99ef-11e1-a4c3-27EXAMPLE804</RequestId>
</ResponseMetadata>
</GetGroupPolicyResponse>"""
USER_TEMPLATE = """<{{ action }}UserResponse>
<{{ action }}UserResult>

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import boto
import boto3
import sure # noqa
from nose.tools import assert_raises
@ -70,3 +71,40 @@ def test_get_groups_for_user():
groups = conn.get_groups_for_user('my-user')['list_groups_for_user_response']['list_groups_for_user_result']['groups']
groups.should.have.length_of(2)
@mock_iam()
def test_put_group_policy():
conn = boto.connect_iam()
conn.create_group('my-group')
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
@mock_iam()
def test_get_group_policy():
conn = boto.connect_iam()
conn.create_group('my-group')
with assert_raises(BotoServerError):
conn.get_group_policy('my-group', 'my-policy')
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
policy = conn.get_group_policy('my-group', 'my-policy')
@mock_iam()
def test_get_all_group_policies():
conn = boto.connect_iam()
conn.create_group('my-group')
policies = conn.get_all_group_policies('my-group')['list_group_policies_response']['list_group_policies_result']['policy_names']
assert policies == []
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
policies = conn.get_all_group_policies('my-group')['list_group_policies_response']['list_group_policies_result']['policy_names']
assert policies == ['my-policy']
@mock_iam()
def test_list_group_policies():
conn = boto3.client('iam')
conn.create_group(GroupName='my-group')
policies = conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.be.empty
conn.put_group_policy(GroupName='my-group', PolicyName='my-policy', PolicyDocument='{"some": "json"}')
policies = conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.equal(['my-policy'])