commit
2f66709583
@ -544,6 +544,43 @@ class SecurityGroup(object):
|
||||
def physical_resource_id(self):
|
||||
return self.id
|
||||
|
||||
def matches_filters(self, filters):
|
||||
result = True
|
||||
|
||||
def to_attr(filter_name):
|
||||
attr = None
|
||||
|
||||
if attr == 'group-name':
|
||||
attr = 'name'
|
||||
elif attr == 'group-id':
|
||||
attr = 'id'
|
||||
else:
|
||||
attr = filter_name.replace('-', '_')
|
||||
|
||||
return attr
|
||||
|
||||
for key, value in filters.items():
|
||||
ret = False
|
||||
|
||||
if key.startswith('ip-permission'):
|
||||
match = re.search(r"ip-permission.(*)", key)
|
||||
ingress_attr = to_attr(match.groups()[0])
|
||||
|
||||
for ingress in self.ingress_rules:
|
||||
if getattr(ingress, ingress_attr) in filters[key]:
|
||||
ret = True
|
||||
break
|
||||
else:
|
||||
attr_name = to_attr(key)
|
||||
ret = getattr(self, attr_name) in filters[key]
|
||||
|
||||
if not ret:
|
||||
break
|
||||
else:
|
||||
result = False
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class SecurityGroupBackend(object):
|
||||
|
||||
@ -566,8 +603,20 @@ class SecurityGroupBackend(object):
|
||||
self.groups[vpc_id][group_id] = group
|
||||
return group
|
||||
|
||||
def describe_security_groups(self):
|
||||
return itertools.chain(*[x.values() for x in self.groups.values()])
|
||||
def describe_security_groups(self, group_ids=None, groupnames=None, filters=None):
|
||||
all_groups = itertools.chain(*[x.values() for x in self.groups.values()])
|
||||
groups = []
|
||||
|
||||
if group_ids or groupnames or filters:
|
||||
for group in all_groups:
|
||||
if ((group_ids and group.id in group_ids) or
|
||||
(groupnames and group.name in groupnames) or
|
||||
(filters and group.matches_filters(filters))):
|
||||
groups.append(group)
|
||||
else:
|
||||
groups = all_groups
|
||||
|
||||
return groups
|
||||
|
||||
def delete_security_group(self, name=None, group_id=None):
|
||||
if group_id:
|
||||
|
@ -3,6 +3,7 @@ from jinja2 import Template
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
from moto.ec2.models import ec2_backend
|
||||
from moto.ec2.utils import filters_from_querystring
|
||||
|
||||
|
||||
def process_rules_from_querystring(querystring):
|
||||
@ -35,6 +36,22 @@ def process_rules_from_querystring(querystring):
|
||||
return (name, group_id, ip_protocol, from_port, to_port, ip_ranges, source_groups, source_group_ids)
|
||||
|
||||
|
||||
def process_group_ids_from_querystring(querystring):
|
||||
group_ids = []
|
||||
for key, value in querystring.items():
|
||||
if 'GroupId' in key:
|
||||
group_ids.append(value[0])
|
||||
return group_ids
|
||||
|
||||
|
||||
def process_groupnames_from_querystring(querystring):
|
||||
groupnames = []
|
||||
for key, value in querystring.items():
|
||||
if 'GroupName' in key:
|
||||
groupnames.append(value[0])
|
||||
return groupnames
|
||||
|
||||
|
||||
class SecurityGroups(BaseResponse):
|
||||
def authorize_security_group_egress(self):
|
||||
raise NotImplementedError('SecurityGroups.authorize_security_group_egress is not yet implemented')
|
||||
@ -65,7 +82,16 @@ class SecurityGroups(BaseResponse):
|
||||
return DELETE_GROUP_RESPONSE
|
||||
|
||||
def describe_security_groups(self):
|
||||
groups = ec2_backend.describe_security_groups()
|
||||
groupnames = process_groupnames_from_querystring(self.querystring)
|
||||
group_ids = process_group_ids_from_querystring(self.querystring)
|
||||
filters = filters_from_querystring(self.querystring)
|
||||
|
||||
groups = ec2_backend.describe_security_groups(
|
||||
group_ids=group_ids,
|
||||
groupnames=groupnames,
|
||||
filters=filters
|
||||
)
|
||||
|
||||
template = Template(DESCRIBE_SECURITY_GROUPS_RESPONSE)
|
||||
return template.render(groups=groups)
|
||||
|
||||
|
@ -1,7 +1,9 @@
|
||||
from __future__ import unicode_literals
|
||||
from moto.core import BaseBackend
|
||||
|
||||
from .utils import random_resource_id
|
||||
from boto.exception import BotoServerError
|
||||
from moto.core import BaseBackend
|
||||
from .utils import random_access_key, random_alphanumeric, random_resource_id
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class Role(object):
|
||||
@ -65,12 +67,91 @@ class Certificate(object):
|
||||
return self.name
|
||||
|
||||
|
||||
class AccessKey(object):
|
||||
def __init__(self, user_name):
|
||||
self.user_name = user_name
|
||||
self.access_key_id = random_access_key()
|
||||
self.secret_access_key = random_alphanumeric(32)
|
||||
self.status = 'Active'
|
||||
self.create_date = datetime.strftime(
|
||||
datetime.utcnow(),
|
||||
"%Y-%m-%d-%H-%M-%S"
|
||||
)
|
||||
|
||||
|
||||
class Group(object):
|
||||
def __init__(self, name, path='/'):
|
||||
self.name = name
|
||||
self.id = random_resource_id()
|
||||
self.path = path
|
||||
self.created = datetime.strftime(
|
||||
datetime.utcnow(),
|
||||
"%Y-%m-%d-%H-%M-%S"
|
||||
)
|
||||
|
||||
self.users = []
|
||||
|
||||
|
||||
class User(object):
|
||||
def __init__(self, name, path='/'):
|
||||
self.name = name
|
||||
self.id = random_resource_id()
|
||||
self.path = path
|
||||
self.created = datetime.strftime(
|
||||
datetime.utcnow(),
|
||||
"%Y-%m-%d-%H-%M-%S"
|
||||
)
|
||||
|
||||
self.policies = {}
|
||||
self.access_keys = []
|
||||
|
||||
def get_policy(self, policy_name):
|
||||
policy_json = None
|
||||
try:
|
||||
policy_json = self.policies[policy_name]
|
||||
except:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
return {
|
||||
'policy_name': policy_name,
|
||||
'policy_document': policy_json,
|
||||
'user_name': self.name,
|
||||
}
|
||||
|
||||
def put_policy(self, policy_name, policy_json):
|
||||
self.policies[policy_name] = policy_json
|
||||
|
||||
def delete_policy(self, policy_name):
|
||||
if policy_name not in self.policies:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
del self.policies[policy_name]
|
||||
|
||||
def create_access_key(self):
|
||||
access_key = AccessKey(self.name)
|
||||
self.access_keys.append(access_key)
|
||||
return access_key
|
||||
|
||||
def get_all_access_keys(self):
|
||||
return self.access_keys
|
||||
|
||||
def delete_access_key(self, access_key_id):
|
||||
for key in self.access_keys:
|
||||
if key.access_key_id == access_key_id:
|
||||
self.access_keys.remove(key)
|
||||
break
|
||||
else:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
|
||||
class IAMBackend(BaseBackend):
|
||||
|
||||
def __init__(self):
|
||||
self.instance_profiles = {}
|
||||
self.roles = {}
|
||||
self.certificates = {}
|
||||
self.groups = {}
|
||||
self.users = {}
|
||||
super(IAMBackend, self).__init__()
|
||||
|
||||
def create_role(self, role_name, assume_role_policy_document, path, policies):
|
||||
@ -125,4 +206,119 @@ class IAMBackend(BaseBackend):
|
||||
if name == cert.cert_name:
|
||||
return cert
|
||||
|
||||
def create_group(self, group_name, path='/'):
|
||||
if group_name in self.groups:
|
||||
raise BotoServerError(409, 'Conflict')
|
||||
|
||||
group = Group(group_name, path)
|
||||
self.groups[group_name] = group
|
||||
return group
|
||||
|
||||
def get_group(self, group_name, marker=None, max_items=None):
|
||||
group = None
|
||||
try:
|
||||
group = self.groups[group_name]
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
return group
|
||||
|
||||
def create_user(self, user_name, path='/'):
|
||||
if user_name in self.users:
|
||||
raise BotoServerError(409, 'Conflict')
|
||||
|
||||
user = User(user_name, path)
|
||||
self.users[user_name] = user
|
||||
return user
|
||||
|
||||
def get_user(self, user_name):
|
||||
user = None
|
||||
try:
|
||||
user = self.users[user_name]
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
return user
|
||||
|
||||
def add_user_to_group(self, group_name, user_name):
|
||||
group = None
|
||||
user = None
|
||||
|
||||
try:
|
||||
group = self.groups[group_name]
|
||||
user = self.users[user_name]
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
group.users.append(user)
|
||||
|
||||
def remove_user_from_group(self, group_name, user_name):
|
||||
group = None
|
||||
user = None
|
||||
|
||||
try:
|
||||
group = self.groups[group_name]
|
||||
user = self.users[user_name]
|
||||
group.users.remove(user)
|
||||
except (KeyError, ValueError):
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
def get_user_policy(self, user_name, policy_name):
|
||||
policy = None
|
||||
try:
|
||||
user = self.users[user_name]
|
||||
policy = user.get_policy(policy_name)
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
return policy
|
||||
|
||||
def put_user_policy(self, user_name, policy_name, policy_json):
|
||||
try:
|
||||
user = self.users[user_name]
|
||||
user.put_policy(policy_name, policy_json)
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
def delete_user_policy(self, user_name, policy_name):
|
||||
try:
|
||||
user = self.users[user_name]
|
||||
user.delete_policy(policy_name)
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
def create_access_key(self, user_name=None):
|
||||
key = None
|
||||
try:
|
||||
user = self.users[user_name]
|
||||
key = user.create_access_key()
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
return key
|
||||
|
||||
def get_all_access_keys(self, user_name, marker=None, max_items=None):
|
||||
keys = None
|
||||
try:
|
||||
user = self.users[user_name]
|
||||
keys = user.get_all_access_keys()
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
return keys
|
||||
|
||||
def delete_access_key(self, access_key_id, user_name):
|
||||
try:
|
||||
user = self.users[user_name]
|
||||
user.delete_access_key(access_key_id)
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
def delete_user(self, user_name):
|
||||
try:
|
||||
del self.users[user_name]
|
||||
except KeyError:
|
||||
raise BotoServerError(404, 'Not Found')
|
||||
|
||||
|
||||
iam_backend = IAMBackend()
|
||||
|
@ -83,6 +83,114 @@ class IamResponse(BaseResponse):
|
||||
template = Template(GET_SERVER_CERTIFICATE_TEMPLATE)
|
||||
return template.render(certificate=cert)
|
||||
|
||||
def create_group(self):
|
||||
group_name = self._get_param('GroupName')
|
||||
path = self._get_param('Path')
|
||||
|
||||
group = iam_backend.create_group(group_name, path)
|
||||
template = Template(CREATE_GROUP_TEMPLATE)
|
||||
return template.render(group=group)
|
||||
|
||||
def get_group(self):
|
||||
group_name = self._get_param('GroupName')
|
||||
|
||||
group = iam_backend.get_group(group_name)
|
||||
template = Template(GET_GROUP_TEMPLATE)
|
||||
return template.render(group=group)
|
||||
|
||||
def create_user(self):
|
||||
user_name = self._get_param('UserName')
|
||||
path = self._get_param('Path')
|
||||
|
||||
user = iam_backend.create_user(user_name, path)
|
||||
template = Template(USER_TEMPLATE)
|
||||
return template.render(action='Create', user=user)
|
||||
|
||||
def get_user(self):
|
||||
user_name = self._get_param('UserName')
|
||||
user = iam_backend.get_user(user_name)
|
||||
template = Template(USER_TEMPLATE)
|
||||
return template.render(action='Get', user=user)
|
||||
|
||||
def add_user_to_group(self):
|
||||
group_name = self._get_param('GroupName')
|
||||
user_name = self._get_param('UserName')
|
||||
|
||||
iam_backend.add_user_to_group(group_name, user_name)
|
||||
template = Template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name='AddUserToGroup')
|
||||
|
||||
def remove_user_from_group(self):
|
||||
group_name = self._get_param('GroupName')
|
||||
user_name = self._get_param('UserName')
|
||||
|
||||
iam_backend.remove_user_from_group(group_name, user_name)
|
||||
template = Template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name='RemoveUserFromGroup')
|
||||
|
||||
def get_user_policy(self):
|
||||
user_name = self._get_param('UserName')
|
||||
policy_name = self._get_param('PolicyName')
|
||||
|
||||
policy_document = iam_backend.get_user_policy(user_name, policy_name)
|
||||
template = Template(GET_USER_POLICY_TEMPLATE)
|
||||
return template.render(
|
||||
user_name=user_name,
|
||||
policy_name=policy_name,
|
||||
policy_document=policy_document
|
||||
)
|
||||
|
||||
def put_user_policy(self):
|
||||
user_name = self._get_param('UserName')
|
||||
policy_name = self._get_param('PolicyName')
|
||||
policy_document = self._get_param('PolicyDocument')
|
||||
|
||||
iam_backend.put_user_policy(user_name, policy_name, policy_document)
|
||||
template = Template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name='PutUserPolicy')
|
||||
|
||||
def delete_user_policy(self):
|
||||
user_name = self._get_param('UserName')
|
||||
policy_name = self._get_param('PolicyName')
|
||||
|
||||
iam_backend.delete_user_policy(user_name, policy_name)
|
||||
template = Template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name='DeleteUserPolicy')
|
||||
|
||||
def create_access_key(self):
|
||||
user_name = self._get_param('UserName')
|
||||
|
||||
key = iam_backend.create_access_key(user_name)
|
||||
template = Template(CREATE_ACCESS_KEY_TEMPLATE)
|
||||
return template.render(key=key)
|
||||
|
||||
def list_access_keys(self):
|
||||
user_name = self._get_param('UserName')
|
||||
|
||||
keys = iam_backend.get_all_access_keys(user_name)
|
||||
template = Template(LIST_ACCESS_KEYS_TEMPLATE)
|
||||
return template.render(user_name=user_name, keys=keys)
|
||||
|
||||
def delete_access_key(self):
|
||||
user_name = self._get_param('UserName')
|
||||
access_key_id = self._get_param('AccessKeyId')
|
||||
|
||||
iam_backend.delete_access_key(access_key_id, user_name)
|
||||
template = Template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name='DeleteAccessKey')
|
||||
|
||||
def delete_user(self):
|
||||
user_name = self._get_param('UserName')
|
||||
iam_backend.delete_user(user_name)
|
||||
template = Template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name='DeleteUser')
|
||||
|
||||
|
||||
GENERIC_EMPTY_TEMPLATE = """<{{ name }}Response>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</{{ name }}Response>"""
|
||||
|
||||
CREATE_INSTANCE_PROFILE_TEMPLATE = """<CreateInstanceProfileResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
|
||||
<CreateInstanceProfileResult>
|
||||
@ -275,3 +383,106 @@ GET_SERVER_CERTIFICATE_TEMPLATE = """<GetServerCertificateResponse>
|
||||
</ResponseMetadata>
|
||||
</GetServerCertificateResponse>"""
|
||||
|
||||
CREATE_GROUP_TEMPLATE = """<CreateGroupResponse>
|
||||
<CreateGroupResult>
|
||||
<Group>
|
||||
<Path>{{ group.path }}</Path>
|
||||
<GroupName>{{ group.name }}</GroupName>
|
||||
<GroupId>{{ group.id }}</GroupId>
|
||||
<Arn>arn:aws:iam::123456789012:group/{{ group.path }}</Arn>
|
||||
</Group>
|
||||
</CreateGroupResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</CreateGroupResponse>"""
|
||||
|
||||
GET_GROUP_TEMPLATE = """<GetGroupResponse>
|
||||
<GetGroupResult>
|
||||
<Group>
|
||||
<Path>{{ group.path }}</Path>
|
||||
<GroupName>{{ group.name }}</GroupName>
|
||||
<GroupId>{{ group.id }}</GroupId>
|
||||
<Arn>arn:aws:iam::123456789012:group/{{ group.path }}</Arn>
|
||||
</Group>
|
||||
<Users>
|
||||
{% for user in group.users %}
|
||||
<member>
|
||||
<Path>{{ user.path }}</Path>
|
||||
<UserName>{{ user.name }}</UserName>
|
||||
<UserId>{{ user.id }}</UserId>
|
||||
<Arn>
|
||||
arn:aws:iam::123456789012:user/{{ user.path }}/{{ user.name}}
|
||||
</Arn>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</Users>
|
||||
<IsTruncated>false</IsTruncated>
|
||||
</GetGroupResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</GetGroupResponse>"""
|
||||
|
||||
USER_TEMPLATE = """<{{ action }}UserResponse>
|
||||
<{{ action }}UserResult>
|
||||
<User>
|
||||
<Path>{{ user.path }}</Path>
|
||||
<UserName>{{ user.name }}</UserName>
|
||||
<UserId>{{ user.id }}</UserId>
|
||||
<Arn>arn:aws:iam::123456789012:user/{{ user.path }}/{{ user.name }}
|
||||
</Arn>
|
||||
</User>
|
||||
</{{ action }}UserResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</{{ action }}UserResponse>"""
|
||||
|
||||
GET_USER_POLICY_TEMPLATE = """<GetUserPolicyResponse>
|
||||
<GetUserPolicyResult>
|
||||
<UserName>{{ user_name }}</UserName>
|
||||
<PolicyName>{{ policy_name }}</PolicyName>
|
||||
<PolicyDocument>
|
||||
{{ policy_document }}
|
||||
</PolicyDocument>
|
||||
</GetUserPolicyResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</GetUserPolicyResponse>"""
|
||||
|
||||
CREATE_ACCESS_KEY_TEMPLATE = """<CreateAccessKeyResponse>
|
||||
<CreateAccessKeyResult>
|
||||
<AccessKey>
|
||||
<UserName>{{ key.user_name }}</UserName>
|
||||
<AccessKeyId>{{ key.access_key_id }}</AccessKeyId>
|
||||
<Status>{{ key.status }}</Status>
|
||||
<SecretAccessKey>
|
||||
{{ key.secret_access_key }}
|
||||
</SecretAccessKey>
|
||||
</AccessKey>
|
||||
</CreateAccessKeyResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</CreateAccessKeyResponse>"""
|
||||
|
||||
LIST_ACCESS_KEYS_TEMPLATE = """<ListAccessKeysResponse>
|
||||
<ListAccessKeysResult>
|
||||
<UserName>{{ user_name }}</UserName>
|
||||
<AccessKeyMetadata>
|
||||
{% for key in keys %}
|
||||
<member>
|
||||
<UserName>{{ user_name }}</UserName>
|
||||
<AccessKeyId>{{ key.access_key_id }}</AccessKeyId>
|
||||
<Status>{{ key.status }}</Status>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</AccessKeyMetadata>
|
||||
<IsTruncated>false</IsTruncated>
|
||||
</ListAccessKeysResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</ListAccessKeysResponse>"""
|
||||
|
@ -4,8 +4,24 @@ import string
|
||||
import six
|
||||
|
||||
|
||||
def random_alphanumeric(length):
|
||||
return ''.join(six.text_type(
|
||||
random.choice(
|
||||
string.ascii_letters + string.digits
|
||||
)) for _ in range(length)
|
||||
)
|
||||
|
||||
|
||||
def random_resource_id():
|
||||
size = 20
|
||||
chars = list(range(10)) + list(string.ascii_lowercase)
|
||||
|
||||
return ''.join(six.text_type(random.choice(chars)) for x in range(size))
|
||||
|
||||
|
||||
def random_access_key():
|
||||
return ''.join(six.text_type(
|
||||
random.choice(
|
||||
string.ascii_uppercase + string.digits
|
||||
)) for _ in range(16)
|
||||
)
|
||||
|
@ -195,3 +195,22 @@ def test_authorize_group_in_vpc():
|
||||
# And check that it gets revoked
|
||||
security_group = [group for group in conn.get_all_security_groups() if group.name == 'test1'][0]
|
||||
security_group.rules.should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_get_all_security_groups():
|
||||
conn = boto.connect_ec2()
|
||||
conn.create_security_group(name='test1', description='test1', vpc_id='vpc-mjm05d27')
|
||||
conn.create_security_group(name='test2', description='test2')
|
||||
|
||||
resp = conn.get_all_security_groups(groupnames=['test1'])
|
||||
resp.should.have.length_of(1)
|
||||
|
||||
resp = conn.get_all_security_groups(filters={'vpc_id': ['vpc-mjm05d27']})
|
||||
resp.should.have.length_of(1)
|
||||
|
||||
resp = conn.get_all_security_groups(filters={'description': ['test1']})
|
||||
resp.should.have.length_of(1)
|
||||
|
||||
resp = conn.get_all_security_groups()
|
||||
resp.should.have.length_of(2)
|
||||
|
@ -1,8 +1,10 @@
|
||||
from __future__ import unicode_literals
|
||||
import boto
|
||||
|
||||
import sure # noqa
|
||||
|
||||
from nose.tools import assert_raises, assert_equals, assert_not_equals
|
||||
from boto.exception import BotoServerError
|
||||
|
||||
from moto import mock_iam
|
||||
|
||||
|
||||
@ -58,3 +60,105 @@ def test_create_role_and_instance_profile():
|
||||
|
||||
conn.list_roles().roles[0].role_name.should.equal('my-role')
|
||||
conn.list_instance_profiles().instance_profiles[0].instance_profile_name.should.equal("my-profile")
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_create_group():
|
||||
conn = boto.connect_iam()
|
||||
conn.create_group('my-group')
|
||||
with assert_raises(BotoServerError):
|
||||
conn.create_group('my-group')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_get_group():
|
||||
conn = boto.connect_iam()
|
||||
conn.create_group('my-group')
|
||||
conn.get_group('my-group')
|
||||
with assert_raises(BotoServerError):
|
||||
conn.get_group('not-group')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_create_user():
|
||||
conn = boto.connect_iam()
|
||||
conn.create_user('my-user')
|
||||
with assert_raises(BotoServerError):
|
||||
conn.create_user('my-user')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_get_user():
|
||||
conn = boto.connect_iam()
|
||||
with assert_raises(BotoServerError):
|
||||
conn.get_user('my-user')
|
||||
conn.create_user('my-user')
|
||||
conn.get_user('my-user')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_add_user_to_group():
|
||||
conn = boto.connect_iam()
|
||||
with assert_raises(BotoServerError):
|
||||
conn.add_user_to_group('my-group', 'my-user')
|
||||
conn.create_group('my-group')
|
||||
with assert_raises(BotoServerError):
|
||||
conn.add_user_to_group('my-group', 'my-user')
|
||||
conn.create_user('my-user')
|
||||
conn.add_user_to_group('my-group', 'my-user')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_remove_user_from_group():
|
||||
conn = boto.connect_iam()
|
||||
with assert_raises(BotoServerError):
|
||||
conn.remove_user_from_group('my-group', 'my-user')
|
||||
conn.create_group('my-group')
|
||||
conn.create_user('my-user')
|
||||
with assert_raises(BotoServerError):
|
||||
conn.remove_user_from_group('my-group', 'my-user')
|
||||
conn.add_user_to_group('my-group', 'my-user')
|
||||
conn.remove_user_from_group('my-group', 'my-user')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_create_access_key():
|
||||
conn = boto.connect_iam()
|
||||
with assert_raises(BotoServerError):
|
||||
conn.create_access_key('my-user')
|
||||
conn.create_user('my-user')
|
||||
conn.create_access_key('my-user')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_get_all_access_keys():
|
||||
conn = boto.connect_iam()
|
||||
conn.create_user('my-user')
|
||||
response = conn.get_all_access_keys('my-user')
|
||||
assert_equals(
|
||||
response['list_access_keys_response']['list_access_keys_result']['access_key_metadata'],
|
||||
[]
|
||||
)
|
||||
conn.create_access_key('my-user')
|
||||
response = conn.get_all_access_keys('my-user')
|
||||
assert_not_equals(
|
||||
response['list_access_keys_response']['list_access_keys_result']['access_key_metadata'],
|
||||
[]
|
||||
)
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_delete_access_key():
|
||||
conn = boto.connect_iam()
|
||||
conn.create_user('my-user')
|
||||
access_key_id = conn.create_access_key('my-user')['create_access_key_response']['create_access_key_result']['access_key']['access_key_id']
|
||||
conn.delete_access_key(access_key_id, 'my-user')
|
||||
|
||||
|
||||
@mock_iam()
|
||||
def test_delete_user():
|
||||
conn = boto.connect_iam()
|
||||
with assert_raises(BotoServerError):
|
||||
conn.delete_user('my-user')
|
||||
conn.create_user('my-user')
|
||||
conn.delete_user('my-user')
|
||||
|
Loading…
Reference in New Issue
Block a user