Merge branch 'master' into batch

This commit is contained in:
Terry Cain 2017-10-03 19:37:53 +01:00
commit a027f86cc8
No known key found for this signature in database
GPG Key ID: 14D90844E4E9B9F3
16 changed files with 709 additions and 172 deletions

View File

@ -3,7 +3,23 @@ Moto Changelog
Latest
------
1.1.16
1.1.21
-----
* ELBv2 bugfixes
* Removing GPL'd dependency
1.1.20
-----
* Improved `make scaffold`
* Implemented IAM attached group policies
* Implemented skeleton of Cloudwatch Logs
* Redshift: fixed multi-params
* Redshift: implement taggable resources
* Lambda + SNS: Major enhancements
1.1.19
-----
* Fixing regression from 1.1.15

View File

@ -132,6 +132,7 @@ class LambdaFunction(BaseModel):
self.logs_backend = logs_backends[self.region]
self.environment_vars = spec.get('Environment', {}).get('Variables', {})
self.docker_client = docker.from_env()
self.policy = ""
# Unfortunately mocking replaces this method w/o fallback enabled, so we
# need to replace it if we detect it's been mocked
@ -527,6 +528,9 @@ class LambdaBackend(BaseBackend):
pass
# Don't care
def add_policy(self, function_name, policy):
self.get_function(function_name).policy = policy
def do_validate_s3():
return os.environ.get('VALIDATE_LAMBDA_S3', '') in ['', '1', 'true']

View File

@ -57,6 +57,35 @@ class LambdaResponse(BaseResponse):
else:
raise ValueError("Cannot handle {0} request".format(request.method))
def policy(self, request, full_url, headers):
if request.method == 'GET':
return self._get_policy(request, full_url, headers)
if request.method == 'POST':
return self._add_policy(request, full_url, headers)
def _add_policy(self, request, full_url, headers):
lambda_backend = self.get_lambda_backend(full_url)
path = request.path if hasattr(request, 'path') else request.path_url
function_name = path.split('/')[-2]
if lambda_backend.has_function(function_name):
policy = request.body.decode('utf8')
lambda_backend.add_policy(function_name, policy)
return 200, {}, json.dumps(dict(Statement=policy))
else:
return 404, {}, "{}"
def _get_policy(self, request, full_url, headers):
lambda_backend = self.get_lambda_backend(full_url)
path = request.path if hasattr(request, 'path') else request.path_url
function_name = path.split('/')[-2]
if lambda_backend.has_function(function_name):
function = lambda_backend.get_function(function_name)
return 200, {}, json.dumps(dict(Policy="{\"Statement\":[" + function.policy + "]}"))
else:
return 404, {}, "{}"
def _invoke(self, request, full_url):
response_headers = {}
lambda_backend = self.get_lambda_backend(full_url)

View File

@ -12,5 +12,6 @@ url_paths = {
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/?$': response.function,
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invocations/?$': response.invoke,
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invoke-async/?$': response.invoke_async,
r'{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)': response.tag
r'{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)': response.tag,
r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/policy/?$': response.policy
}

View File

@ -152,6 +152,13 @@ class InvalidDescribeRulesRequest(ELBClientError):
)
class ResourceInUseError(ELBClientError):
def __init__(self, msg="A specified resource is in use"):
super(ResourceInUseError, self).__init__(
"ResourceInUse", msg)
class RuleNotFoundError(ELBClientError):
def __init__(self):

View File

@ -21,6 +21,7 @@ from .exceptions import (
InvalidActionTypeError,
ActionTargetGroupNotFoundError,
InvalidDescribeRulesRequest,
ResourceInUseError,
RuleNotFoundError,
DuplicatePriorityError,
InvalidTargetGroupNameError,
@ -426,10 +427,17 @@ class ELBv2Backend(BaseBackend):
# however, boto3 does't raise error even if rule is not found
def delete_target_group(self, target_group_arn):
target_group = self.target_groups.pop(target_group_arn, None)
if target_group_arn not in self.target_groups:
raise TargetGroupNotFoundError()
target_group = self.target_groups[target_group_arn]
if target_group:
if self._any_listener_using(target_group_arn):
raise ResourceInUseError(
"The target group '{}' is currently in use by a listener or a rule".format(
target_group_arn))
del self.target_groups[target_group_arn]
return target_group
raise TargetGroupNotFoundError()
def delete_listener(self, listener_arn):
for load_balancer in self.load_balancers.values():
@ -539,6 +547,15 @@ class ELBv2Backend(BaseBackend):
modified_rules.append(given_rule)
return modified_rules
def _any_listener_using(self, target_group_arn):
for load_balancer in self.load_balancers.values():
for listener in load_balancer.listeners.values():
for rule in listener.rules:
for action in rule.actions:
if action.get('target_group_arn') == target_group_arn:
return True
return False
elbv2_backends = {}
for region in ec2_backends.keys():

View File

@ -74,21 +74,13 @@ class ManagedPolicy(Policy):
is_attachable = True
def attach_to_role(self, role):
def attach_to(self, obj):
self.attachment_count += 1
role.managed_policies[self.name] = self
obj.managed_policies[self.name] = self
def detach_from_role(self, role):
def detach_from(self, obj):
self.attachment_count -= 1
del role.managed_policies[self.name]
def attach_to_user(self, user):
self.attachment_count += 1
user.managed_policies[self.name] = self
def detach_from_user(self, user):
self.attachment_count -= 1
del user.managed_policies[self.name]
del obj.managed_policies[self.name]
class AWSManagedPolicy(ManagedPolicy):
@ -249,6 +241,7 @@ class Group(BaseModel):
)
self.users = []
self.managed_policies = {}
self.policies = {}
def get_cfn_attribute(self, attribute_name):
@ -423,25 +416,47 @@ class IAMBackend(BaseBackend):
def attach_role_policy(self, policy_arn, role_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
policy = arns[policy_arn]
policy.attach_to_role(self.get_role(role_name))
policy.attach_to(self.get_role(role_name))
def detach_role_policy(self, policy_arn, role_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
try:
policy = arns[policy_arn]
policy.detach_from_role(self.get_role(role_name))
policy.detach_from(self.get_role(role_name))
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
def attach_group_policy(self, policy_arn, group_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.attach_to(self.get_group(group_name))
def detach_group_policy(self, policy_arn, group_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.detach_from(self.get_group(group_name))
def attach_user_policy(self, policy_arn, user_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
policy = arns[policy_arn]
policy.attach_to_user(self.get_user(user_name))
try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.attach_to(self.get_user(user_name))
def detach_user_policy(self, policy_arn, user_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
policy = arns[policy_arn]
policy.detach_from_user(self.get_user(user_name))
try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.detach_from(self.get_user(user_name))
def create_policy(self, description, path, policy_document, policy_name):
policy = ManagedPolicy(
@ -458,39 +473,15 @@ class IAMBackend(BaseBackend):
def list_attached_role_policies(self, role_name, marker=None, max_items=100, path_prefix='/'):
policies = self.get_role(role_name).managed_policies.values()
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
if path_prefix:
policies = [p for p in policies if p.path.startswith(path_prefix)]
policies = sorted(policies, key=lambda policy: policy.name)
start_idx = int(marker) if marker else 0
policies = policies[start_idx:start_idx + max_items]
if len(policies) < max_items:
marker = None
else:
marker = str(start_idx + max_items)
return policies, marker
def list_attached_group_policies(self, group_name, marker=None, max_items=100, path_prefix='/'):
policies = self.get_group(group_name).managed_policies.values()
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
def list_attached_user_policies(self, user_name, marker=None, max_items=100, path_prefix='/'):
policies = self.get_user(user_name).managed_policies.values()
if path_prefix:
policies = [p for p in policies if p.path.startswith(path_prefix)]
policies = sorted(policies, key=lambda policy: policy.name)
start_idx = int(marker) if marker else 0
policies = policies[start_idx:start_idx + max_items]
if len(policies) < max_items:
marker = None
else:
marker = str(start_idx + max_items)
return policies, marker
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
def list_policies(self, marker, max_items, only_attached, path_prefix, scope):
policies = self.managed_policies.values()
@ -504,6 +495,9 @@ class IAMBackend(BaseBackend):
policies = [p for p in policies if not isinstance(
p, AWSManagedPolicy)]
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
def _filter_attached_policies(self, policies, marker, max_items, path_prefix):
if path_prefix:
policies = [p for p in policies if p.path.startswith(path_prefix)]

View File

@ -20,6 +20,20 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DetachRolePolicyResponse")
def attach_group_policy(self):
policy_arn = self._get_param('PolicyArn')
group_name = self._get_param('GroupName')
iam_backend.attach_group_policy(policy_arn, group_name)
template = self.response_template(ATTACH_GROUP_POLICY_TEMPLATE)
return template.render()
def detach_group_policy(self):
policy_arn = self._get_param('PolicyArn')
group_name = self._get_param('GroupName')
iam_backend.detach_group_policy(policy_arn, group_name)
template = self.response_template(DETACH_GROUP_POLICY_TEMPLATE)
return template.render()
def attach_user_policy(self):
policy_arn = self._get_param('PolicyArn')
user_name = self._get_param('UserName')
@ -54,6 +68,17 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_ATTACHED_ROLE_POLICIES_TEMPLATE)
return template.render(policies=policies, marker=marker)
def list_attached_group_policies(self):
marker = self._get_param('Marker')
max_items = self._get_int_param('MaxItems', 100)
path_prefix = self._get_param('PathPrefix', '/')
group_name = self._get_param('GroupName')
policies, marker = iam_backend.list_attached_group_policies(
group_name, marker=marker, max_items=max_items,
path_prefix=path_prefix)
template = self.response_template(LIST_ATTACHED_GROUP_POLICIES_TEMPLATE)
return template.render(policies=policies, marker=marker)
def list_attached_user_policies(self):
marker = self._get_param('Marker')
max_items = self._get_int_param('MaxItems', 100)
@ -520,6 +545,18 @@ DETACH_USER_POLICY_TEMPLATE = """<DetachUserPolicyResponse>
</ResponseMetadata>
</DetachUserPolicyResponse>"""
ATTACH_GROUP_POLICY_TEMPLATE = """<AttachGroupPolicyResponse>
<ResponseMetadata>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</AttachGroupPolicyResponse>"""
DETACH_GROUP_POLICY_TEMPLATE = """<DetachGroupPolicyResponse>
<ResponseMetadata>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</DetachGroupPolicyResponse>"""
CREATE_POLICY_TEMPLATE = """<CreatePolicyResponse>
<CreatePolicyResult>
<Policy>
@ -560,6 +597,28 @@ LIST_ATTACHED_ROLE_POLICIES_TEMPLATE = """<ListAttachedRolePoliciesResponse>
</ResponseMetadata>
</ListAttachedRolePoliciesResponse>"""
LIST_ATTACHED_GROUP_POLICIES_TEMPLATE = """<ListAttachedGroupPoliciesResponse>
<ListAttachedGroupPoliciesResult>
{% if marker is none %}
<IsTruncated>false</IsTruncated>
{% else %}
<IsTruncated>true</IsTruncated>
<Marker>{{ marker }}</Marker>
{% endif %}
<AttachedPolicies>
{% for policy in policies %}
<member>
<PolicyName>{{ policy.name }}</PolicyName>
<PolicyArn>{{ policy.arn }}</PolicyArn>
</member>
{% endfor %}
</AttachedPolicies>
</ListAttachedGroupPoliciesResult>
<ResponseMetadata>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</ListAttachedGroupPoliciesResponse>"""
LIST_ATTACHED_USER_POLICIES_TEMPLATE = """<ListAttachedUserPoliciesResponse>
<ListAttachedUserPoliciesResult>
{% if marker is none %}

View File

@ -2,7 +2,8 @@ from __future__ import unicode_literals
import json
import dicttoxml
import xmltodict
from jinja2 import Template
from six import iteritems
@ -26,6 +27,24 @@ def convert_json_error_to_xml(json_error):
return template.render(code=code, message=message)
def itemize(data):
"""
The xmltodict.unparse requires we modify the shape of the input dictionary slightly. Instead of a dict of the form:
{'key': ['value1', 'value2']}
We must provide:
{'key': {'item': ['value1', 'value2']}}
"""
if isinstance(data, dict):
ret = {}
for key in data:
ret[key] = itemize(data[key])
return ret
elif isinstance(data, list):
return {'item': [itemize(value) for value in data]}
else:
return data
class RedshiftResponse(BaseResponse):
@property
@ -36,8 +55,10 @@ class RedshiftResponse(BaseResponse):
if self.request_json:
return json.dumps(response)
else:
xml = dicttoxml.dicttoxml(response, attr_type=False, root=False)
return xml.decode("utf-8")
xml = xmltodict.unparse(itemize(response), full_document=False)
if hasattr(xml, 'decode'):
xml = xml.decode('utf-8')
return xml
def call_action(self):
status, headers, body = super(RedshiftResponse, self).call_action()
@ -66,6 +87,24 @@ class RedshiftResponse(BaseResponse):
count += 1
return unpacked_list
def _get_cluster_security_groups(self):
cluster_security_groups = self._get_multi_param('ClusterSecurityGroups.member')
if not cluster_security_groups:
cluster_security_groups = self._get_multi_param('ClusterSecurityGroups.ClusterSecurityGroupName')
return cluster_security_groups
def _get_vpc_security_group_ids(self):
vpc_security_group_ids = self._get_multi_param('VpcSecurityGroupIds.member')
if not vpc_security_group_ids:
vpc_security_group_ids = self._get_multi_param('VpcSecurityGroupIds.VpcSecurityGroupId')
return vpc_security_group_ids
def _get_subnet_ids(self):
subnet_ids = self._get_multi_param('SubnetIds.member')
if not subnet_ids:
subnet_ids = self._get_multi_param('SubnetIds.SubnetIdentifier')
return subnet_ids
def create_cluster(self):
cluster_kwargs = {
"cluster_identifier": self._get_param('ClusterIdentifier'),
@ -74,8 +113,8 @@ class RedshiftResponse(BaseResponse):
"master_user_password": self._get_param('MasterUserPassword'),
"db_name": self._get_param('DBName'),
"cluster_type": self._get_param('ClusterType'),
"cluster_security_groups": self._get_multi_param('ClusterSecurityGroups.member'),
"vpc_security_group_ids": self._get_multi_param('VpcSecurityGroupIds.member'),
"cluster_security_groups": self._get_cluster_security_groups(),
"vpc_security_group_ids": self._get_vpc_security_group_ids(),
"cluster_subnet_group_name": self._get_param('ClusterSubnetGroupName'),
"availability_zone": self._get_param('AvailabilityZone'),
"preferred_maintenance_window": self._get_param('PreferredMaintenanceWindow'),
@ -116,10 +155,8 @@ class RedshiftResponse(BaseResponse):
"publicly_accessible": self._get_param("PubliclyAccessible"),
"cluster_parameter_group_name": self._get_param(
'ClusterParameterGroupName'),
"cluster_security_groups": self._get_multi_param(
'ClusterSecurityGroups.member'),
"vpc_security_group_ids": self._get_multi_param(
'VpcSecurityGroupIds.member'),
"cluster_security_groups": self._get_cluster_security_groups(),
"vpc_security_group_ids": self._get_vpc_security_group_ids(),
"preferred_maintenance_window": self._get_param(
'PreferredMaintenanceWindow'),
"automated_snapshot_retention_period": self._get_int_param(
@ -161,8 +198,8 @@ class RedshiftResponse(BaseResponse):
"node_type": self._get_param('NodeType'),
"master_user_password": self._get_param('MasterUserPassword'),
"cluster_type": self._get_param('ClusterType'),
"cluster_security_groups": self._get_multi_param('ClusterSecurityGroups.member'),
"vpc_security_group_ids": self._get_multi_param('VpcSecurityGroupIds.member'),
"cluster_security_groups": self._get_cluster_security_groups(),
"vpc_security_group_ids": self._get_vpc_security_group_ids(),
"cluster_subnet_group_name": self._get_param('ClusterSubnetGroupName'),
"preferred_maintenance_window": self._get_param('PreferredMaintenanceWindow'),
"cluster_parameter_group_name": self._get_param('ClusterParameterGroupName'),
@ -173,12 +210,6 @@ class RedshiftResponse(BaseResponse):
"publicly_accessible": self._get_param("PubliclyAccessible"),
"encrypted": self._get_param("Encrypted"),
}
# There's a bug in boto3 where the security group ids are not passed
# according to the AWS documentation
if not request_kwargs['vpc_security_group_ids']:
request_kwargs['vpc_security_group_ids'] = self._get_multi_param(
'VpcSecurityGroupIds.VpcSecurityGroupId')
cluster_kwargs = {}
# We only want parameters that were actually passed in, otherwise
# we'll stomp all over our cluster metadata with None values.
@ -217,11 +248,7 @@ class RedshiftResponse(BaseResponse):
def create_cluster_subnet_group(self):
cluster_subnet_group_name = self._get_param('ClusterSubnetGroupName')
description = self._get_param('Description')
subnet_ids = self._get_multi_param('SubnetIds.member')
# There's a bug in boto3 where the subnet ids are not passed
# according to the AWS documentation
if not subnet_ids:
subnet_ids = self._get_multi_param('SubnetIds.SubnetIdentifier')
subnet_ids = self._get_subnet_ids()
tags = self.unpack_complex_list_params('Tags.Tag', ('Key', 'Value'))
subnet_group = self.redshift_backend.create_cluster_subnet_group(

View File

@ -107,6 +107,62 @@ def render_template(tmpl_dir, tmpl_filename, context, service, alt_filename=None
f.write(rendered)
def append_mock_to_init_py(service):
path = os.path.join(os.path.dirname(__file__), '..', 'moto', '__init__.py')
with open(path) as f:
lines = [_.replace('\n', '') for _ in f.readlines()]
if any(_ for _ in lines if re.match('^from.*mock_{}.*$'.format(service), _)):
return
filtered_lines = [_ for _ in lines if re.match('^from.*mock.*$', _)]
last_import_line_index = lines.index(filtered_lines[-1])
new_line = 'from .{} import mock_{} # flake8: noqa'.format(service, service)
lines.insert(last_import_line_index + 1, new_line)
body = '\n'.join(lines) + '\n'
with open(path, 'w') as f:
f.write(body)
def append_mock_import_to_backends_py(service):
path = os.path.join(os.path.dirname(__file__), '..', 'moto', 'backends.py')
with open(path) as f:
lines = [_.replace('\n', '') for _ in f.readlines()]
if any(_ for _ in lines if re.match('^from moto\.{}.*{}_backends.*$'.format(service, service), _)):
return
filtered_lines = [_ for _ in lines if re.match('^from.*backends.*$', _)]
last_import_line_index = lines.index(filtered_lines[-1])
new_line = 'from moto.{} import {}_backends'.format(service, service)
lines.insert(last_import_line_index + 1, new_line)
body = '\n'.join(lines) + '\n'
with open(path, 'w') as f:
f.write(body)
def append_mock_dict_to_backends_py(service):
path = os.path.join(os.path.dirname(__file__), '..', 'moto', 'backends.py')
with open(path) as f:
lines = [_.replace('\n', '') for _ in f.readlines()]
# 'xray': xray_backends
if any(_ for _ in lines if re.match(".*'{}': {}_backends.*".format(service, service), _)):
return
filtered_lines = [_ for _ in lines if re.match(".*'.*':.*_backends.*", _)]
last_elem_line_index = lines.index(filtered_lines[-1])
new_line = " '{}': {}_backends,".format(service, service)
prev_line = lines[last_elem_line_index]
if not prev_line.endswith('{') and not prev_line.endswith(','):
lines[last_elem_line_index] += ','
lines.insert(last_elem_line_index + 1, new_line)
body = '\n'.join(lines) + '\n'
with open(path, 'w') as f:
f.write(body)
def initialize_service(service, operation, api_protocol):
"""create lib and test dirs if not exist
"""
@ -115,11 +171,14 @@ def initialize_service(service, operation, api_protocol):
print_progress('Initializing service', service, 'green')
service_class = boto3.client(service).__class__.__name__
client = boto3.client(service)
service_class = client.__class__.__name__
endpoint_prefix = client._service_model.endpoint_prefix
tmpl_context = {
'service': service,
'service_class': service_class
'service_class': service_class,
'endpoint_prefix': endpoint_prefix
}
# initialize service directory
@ -148,6 +207,11 @@ def initialize_service(service, operation, api_protocol):
tmpl_dir, tmpl_filename, tmpl_context, service, alt_filename
)
# append mock to init files
append_mock_to_init_py(service)
append_mock_import_to_backends_py(service)
append_mock_dict_to_backends_py(service)
def to_upper_camel_case(s):
return ''.join([_.title() for _ in s.split('_')])
@ -324,6 +388,41 @@ def insert_code_to_class(path, base_class, new_code):
f.write(body)
def insert_url(service, operation):
client = boto3.client(service)
service_class = client.__class__.__name__
aws_operation_name = to_upper_camel_case(operation)
uri = client._service_model.operation_model(aws_operation_name).http['requestUri']
path = os.path.join(os.path.dirname(__file__), '..', 'moto', service, 'urls.py')
with open(path) as f:
lines = [_.replace('\n', '') for _ in f.readlines()]
if any(_ for _ in lines if re.match(uri, _)):
return
url_paths_found = False
last_elem_line_index = -1
for i, line in enumerate(lines):
if line.startswith('url_paths'):
url_paths_found = True
if url_paths_found and line.startswith('}'):
last_elem_line_index = i - 1
prev_line = lines[last_elem_line_index]
if not prev_line.endswith('{') and not prev_line.endswith(','):
lines[last_elem_line_index] += ','
new_line = " '{0}%s$': %sResponse.dispatch," % (
uri, service_class
)
lines.insert(last_elem_line_index + 1, new_line)
body = '\n'.join(lines) + '\n'
with open(path, 'w') as f:
f.write(body)
def insert_query_codes(service, operation):
func_in_responses = get_function_in_responses(service, operation, 'query')
func_in_models = get_function_in_models(service, operation)
@ -346,6 +445,9 @@ def insert_query_codes(service, operation):
print_progress('inserting code', models_path, 'green')
insert_code_to_class(models_path, BaseBackend, func_in_models)
# edit urls.py
insert_url(service, operation)
def insert_json_codes(service, operation):
func_in_responses = get_function_in_responses(service, operation, 'json')
func_in_models = get_function_in_models(service, operation)
@ -360,6 +462,9 @@ def insert_json_codes(service, operation):
print_progress('inserting code', models_path, 'green')
insert_code_to_class(models_path, BaseBackend, func_in_models)
# edit urls.py
insert_url(service, operation)
def insert_restjson_codes(service, operation):
func_in_models = get_function_in_models(service, operation)
@ -369,6 +474,9 @@ def insert_restjson_codes(service, operation):
print_progress('inserting code', models_path, 'green')
insert_code_to_class(models_path, BaseBackend, func_in_models)
# edit urls.py
insert_url(service, operation)
@click.command()
def main():
service, operation = select_service_and_operation()
@ -383,7 +491,7 @@ def main():
else:
print_progress('skip inserting code', 'api protocol "{}" is not supported'.format(api_protocol), 'yellow')
click.echo('You will still need to make "{0}/urls.py", add the backend into "backends.py" and add the mock into "__init__.py"'.format(service))
click.echo('You will still need to add the mock into "__init__.py"'.format(service))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,9 @@
from __future__ import unicode_literals
from .responses import {{ service_class }}Response
url_bases = [
"https?://{{ endpoint_prefix }}.(.+).amazonaws.com",
]
url_paths = {
}

View File

@ -13,7 +13,6 @@ install_requires = [
"cryptography>=2.0.0",
"requests>=2.5",
"xmltodict",
"dicttoxml",
"six>1.9",
"werkzeug",
"pyaml",
@ -37,7 +36,7 @@ else:
setup(
name='moto',
version='1.1.19',
version='1.1.21',
description='A library that allows your python tests to easily'
' mock out the boto library',
author='Steve Pulec',

View File

@ -645,3 +645,74 @@ def test_get_function_created_with_zipfile():
}
},
)
@mock_lambda
def add_function_permission():
conn = boto3.client('lambda', 'us-west-2')
zip_content = get_test_zip_file1()
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'ZipFile': zip_content,
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.add_permission(
FunctionName='testFunction',
StatementId='1',
Action="lambda:InvokeFunction",
Principal='432143214321',
SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld",
SourceAccount='123412341234',
EventSourceToken='blah',
Qualifier='2'
)
assert 'Statement' in response
res = json.loads(response['Statement'])
assert res['Action'] == "lambda:InvokeFunction"
@mock_lambda
def get_function_policy():
conn = boto3.client('lambda', 'us-west-2')
zip_content = get_test_zip_file1()
result = conn.create_function(
FunctionName='testFunction',
Runtime='python2.7',
Role='test-iam-role',
Handler='lambda_function.handler',
Code={
'ZipFile': zip_content,
},
Description='test lambda function',
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.add_permission(
FunctionName='testFunction',
StatementId='1',
Action="lambda:InvokeFunction",
Principal='432143214321',
SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld",
SourceAccount='123412341234',
EventSourceToken='blah',
Qualifier='2'
)
response = conn.get_policy(
FunctionName='testFunction'
)
assert 'Policy' in response
assert isinstance(response['Policy'], str)
res = json.loads(response['Policy'])
assert res['Statement'][0]['Action'] == 'lambda:InvokeFunction'

View File

@ -14,10 +14,17 @@ def test_create_load_balancer():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = conn.create_load_balancer(
Name='my-lb',
@ -29,7 +36,8 @@ def test_create_load_balancer():
lb = response.get('LoadBalancers')[0]
lb.get('DNSName').should.equal("my-lb-1.us-east-1.elb.amazonaws.com")
lb.get('LoadBalancerArn').should.equal('arn:aws:elasticloadbalancing:us-east-1:1:loadbalancer/my-lb/50dc6c495c0c9188')
lb.get('LoadBalancerArn').should.equal(
'arn:aws:elasticloadbalancing:us-east-1:1:loadbalancer/my-lb/50dc6c495c0c9188')
lb.get('SecurityGroups').should.equal([security_group.id])
lb.get('AvailabilityZones').should.equal([
{'SubnetId': subnet1.id, 'ZoneName': 'us-east-1a'},
@ -37,7 +45,8 @@ def test_create_load_balancer():
# Ensure the tags persisted
response = conn.describe_tags(ResourceArns=[lb.get('LoadBalancerArn')])
tags = {d['Key']: d['Value'] for d in response['TagDescriptions'][0]['Tags']}
tags = {d['Key']: d['Value']
for d in response['TagDescriptions'][0]['Tags']}
tags.should.equal({'key_name': 'a_value'})
@ -47,10 +56,17 @@ def test_describe_load_balancers():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
conn.create_load_balancer(
Name='my-lb',
@ -65,11 +81,14 @@ def test_describe_load_balancers():
lb = response.get('LoadBalancers')[0]
lb.get('LoadBalancerName').should.equal('my-lb')
response = conn.describe_load_balancers(LoadBalancerArns=[lb.get('LoadBalancerArn')])
response.get('LoadBalancers')[0].get('LoadBalancerName').should.equal('my-lb')
response = conn.describe_load_balancers(
LoadBalancerArns=[lb.get('LoadBalancerArn')])
response.get('LoadBalancers')[0].get(
'LoadBalancerName').should.equal('my-lb')
response = conn.describe_load_balancers(Names=['my-lb'])
response.get('LoadBalancers')[0].get('LoadBalancerName').should.equal('my-lb')
response.get('LoadBalancers')[0].get(
'LoadBalancerName').should.equal('my-lb')
with assert_raises(ClientError):
conn.describe_load_balancers(LoadBalancerArns=['not-a/real/arn'])
@ -84,10 +103,17 @@ def test_add_remove_tags():
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
conn.create_load_balancer(
Name='my-lb',
@ -197,10 +223,19 @@ def test_create_elb_in_multiple_region():
conn = boto3.client('elbv2', region_name=region)
ec2 = boto3.resource('ec2', region_name=region)
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone=region + 'a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone=region + 'b')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(
CidrBlock='172.28.7.0/24',
InstanceTenancy='default')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone=region + 'a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone=region + 'b')
conn.create_load_balancer(
Name='my-lb',
@ -210,10 +245,14 @@ def test_create_elb_in_multiple_region():
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
list(
boto3.client('elbv2', region_name='us-west-1').describe_load_balancers().get('LoadBalancers')
boto3.client(
'elbv2',
region_name='us-west-1').describe_load_balancers().get('LoadBalancers')
).should.have.length_of(1)
list(
boto3.client('elbv2', region_name='us-west-2').describe_load_balancers().get('LoadBalancers')
boto3.client(
'elbv2',
region_name='us-west-2').describe_load_balancers().get('LoadBalancers')
).should.have.length_of(1)
@ -223,10 +262,17 @@ def test_create_target_group_and_listeners():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = conn.create_load_balancer(
Name='my-lb',
@ -254,7 +300,8 @@ def test_create_target_group_and_listeners():
target_group_arn = target_group['TargetGroupArn']
# Add tags to the target group
conn.add_tags(ResourceArns=[target_group_arn], Tags=[{'Key': 'target', 'Value': 'group'}])
conn.add_tags(ResourceArns=[target_group_arn], Tags=[
{'Key': 'target', 'Value': 'group'}])
conn.describe_tags(ResourceArns=[target_group_arn])['TagDescriptions'][0]['Tags'].should.equal(
[{'Key': 'target', 'Value': 'group'}])
@ -281,7 +328,8 @@ def test_create_target_group_and_listeners():
LoadBalancerArn=load_balancer_arn,
Protocol='HTTPS',
Port=443,
Certificates=[{'CertificateArn': 'arn:aws:iam:123456789012:server-certificate/test-cert'}],
Certificates=[
{'CertificateArn': 'arn:aws:iam:123456789012:server-certificate/test-cert'}],
DefaultActions=[{'Type': 'forward', 'TargetGroupArn': target_group.get('TargetGroupArn')}])
listener = response.get('Listeners')[0]
listener.get('Port').should.equal(443)
@ -303,9 +351,20 @@ def test_create_target_group_and_listeners():
listener.get('Port').should.equal(443)
listener.get('Protocol').should.equal('HTTPS')
response = conn.describe_listeners(ListenerArns=[http_listener_arn, https_listener_arn])
response = conn.describe_listeners(
ListenerArns=[
http_listener_arn,
https_listener_arn])
response.get('Listeners').should.have.length_of(2)
# Try to delete the target group and it fails because there's a
# listener referencing it
with assert_raises(ClientError) as e:
conn.delete_target_group(
TargetGroupArn=target_group.get('TargetGroupArn'))
e.exception.operation_name.should.equal('DeleteTargetGroup')
e.exception.args.should.equal(("An error occurred (ResourceInUse) when calling the DeleteTargetGroup operation: The target group 'arn:aws:elasticloadbalancing:us-east-1:1:targetgroup/a-target/50dc6c495c0c9188' is currently in use by a listener or a rule", )) # NOQA
# Delete one listener
response = conn.describe_listeners(LoadBalancerArn=load_balancer_arn)
response.get('Listeners').should.have.length_of(2)
@ -321,7 +380,10 @@ def test_create_target_group_and_listeners():
response.get('LoadBalancers').should.have.length_of(0)
# And it deleted the remaining listener
response = conn.describe_listeners(ListenerArns=[http_listener_arn, https_listener_arn])
response = conn.describe_listeners(
ListenerArns=[
http_listener_arn,
https_listener_arn])
response.get('Listeners').should.have.length_of(0)
# But not the target groups
@ -359,7 +421,13 @@ def test_create_invalid_target_group():
UnhealthyThresholdCount=2,
Matcher={'HttpCode': '200'})
invalid_names = ['-name', 'name-', '-name-', 'example.com', 'test@test', 'Na--me']
invalid_names = [
'-name',
'name-',
'-name-',
'example.com',
'test@test',
'Na--me']
for name in invalid_names:
with assert_raises(ClientError):
conn.create_target_group(
@ -399,10 +467,17 @@ def test_describe_paginated_balancers():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
for i in range(51):
conn.create_load_balancer(
@ -414,7 +489,8 @@ def test_describe_paginated_balancers():
resp = conn.describe_load_balancers()
resp['LoadBalancers'].should.have.length_of(50)
resp['NextMarker'].should.equal(resp['LoadBalancers'][-1]['LoadBalancerName'])
resp['NextMarker'].should.equal(
resp['LoadBalancers'][-1]['LoadBalancerName'])
resp2 = conn.describe_load_balancers(Marker=resp['NextMarker'])
resp2['LoadBalancers'].should.have.length_of(1)
assert 'NextToken' not in resp2.keys()
@ -426,10 +502,17 @@ def test_delete_load_balancer():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = conn.create_load_balancer(
Name='my-lb',
@ -452,10 +535,17 @@ def test_register_targets():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
conn.create_load_balancer(
Name='my-lb',
@ -480,7 +570,8 @@ def test_register_targets():
target_group = response.get('TargetGroups')[0]
# No targets registered yet
response = conn.describe_target_health(TargetGroupArn=target_group.get('TargetGroupArn'))
response = conn.describe_target_health(
TargetGroupArn=target_group.get('TargetGroupArn'))
response.get('TargetHealthDescriptions').should.have.length_of(0)
response = ec2.create_instances(
@ -501,14 +592,16 @@ def test_register_targets():
},
])
response = conn.describe_target_health(TargetGroupArn=target_group.get('TargetGroupArn'))
response = conn.describe_target_health(
TargetGroupArn=target_group.get('TargetGroupArn'))
response.get('TargetHealthDescriptions').should.have.length_of(2)
response = conn.deregister_targets(
TargetGroupArn=target_group.get('TargetGroupArn'),
Targets=[{'Id': instance_id2}])
response = conn.describe_target_health(TargetGroupArn=target_group.get('TargetGroupArn'))
response = conn.describe_target_health(
TargetGroupArn=target_group.get('TargetGroupArn'))
response.get('TargetHealthDescriptions').should.have.length_of(1)
@ -518,10 +611,17 @@ def test_target_group_attributes():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = conn.create_load_balancer(
Name='my-lb',
@ -557,9 +657,11 @@ def test_target_group_attributes():
target_group_arn = target_group['TargetGroupArn']
# The attributes should start with the two defaults
response = conn.describe_target_group_attributes(TargetGroupArn=target_group_arn)
response = conn.describe_target_group_attributes(
TargetGroupArn=target_group_arn)
response['Attributes'].should.have.length_of(2)
attributes = {attr['Key']: attr['Value'] for attr in response['Attributes']}
attributes = {attr['Key']: attr['Value']
for attr in response['Attributes']}
attributes['deregistration_delay.timeout_seconds'].should.equal('300')
attributes['stickiness.enabled'].should.equal('false')
@ -579,14 +681,17 @@ def test_target_group_attributes():
# The response should have only the keys updated
response['Attributes'].should.have.length_of(2)
attributes = {attr['Key']: attr['Value'] for attr in response['Attributes']}
attributes = {attr['Key']: attr['Value']
for attr in response['Attributes']}
attributes['stickiness.type'].should.equal('lb_cookie')
attributes['stickiness.enabled'].should.equal('true')
# These new values should be in the full attribute list
response = conn.describe_target_group_attributes(TargetGroupArn=target_group_arn)
response = conn.describe_target_group_attributes(
TargetGroupArn=target_group_arn)
response['Attributes'].should.have.length_of(3)
attributes = {attr['Key']: attr['Value'] for attr in response['Attributes']}
attributes = {attr['Key']: attr['Value']
for attr in response['Attributes']}
attributes['stickiness.type'].should.equal('lb_cookie')
attributes['stickiness.enabled'].should.equal('true')
@ -597,10 +702,17 @@ def test_handle_listener_rules():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = conn.create_load_balancer(
Name='my-lb',
@ -649,11 +761,11 @@ def test_handle_listener_rules():
Priority=priority,
Conditions=[{
'Field': 'host-header',
'Values': [ host ]
'Values': [host]
},
{
{
'Field': 'path-pattern',
'Values': [ path_pattern ]
'Values': [path_pattern]
}],
Actions=[{
'TargetGroupArn': target_group.get('TargetGroupArn'),
@ -671,11 +783,11 @@ def test_handle_listener_rules():
Priority=priority,
Conditions=[{
'Field': 'host-header',
'Values': [ host ]
'Values': [host]
},
{
{
'Field': 'path-pattern',
'Values': [ path_pattern ]
'Values': [path_pattern]
}],
Actions=[{
'TargetGroupArn': target_group.get('TargetGroupArn'),
@ -684,18 +796,17 @@ def test_handle_listener_rules():
)
# test for PriorityInUse
host2 = 'yyy.example.com'
with assert_raises(ClientError):
r = conn.create_rule(
conn.create_rule(
ListenerArn=http_listener_arn,
Priority=priority,
Conditions=[{
'Field': 'host-header',
'Values': [ host ]
'Values': [host]
},
{
{
'Field': 'path-pattern',
'Values': [ path_pattern ]
'Values': [path_pattern]
}],
Actions=[{
'TargetGroupArn': target_group.get('TargetGroupArn'),
@ -703,7 +814,6 @@ def test_handle_listener_rules():
}]
)
# test for describe listeners
obtained_rules = conn.describe_rules(ListenerArn=http_listener_arn)
len(obtained_rules['Rules']).should.equal(3)
@ -716,15 +826,20 @@ def test_handle_listener_rules():
obtained_rules['Rules'].should.equal([first_rule])
# test for pagination
obtained_rules = conn.describe_rules(ListenerArn=http_listener_arn, PageSize=1)
obtained_rules = conn.describe_rules(
ListenerArn=http_listener_arn, PageSize=1)
len(obtained_rules['Rules']).should.equal(1)
obtained_rules.should.have.key('NextMarker')
next_marker = obtained_rules['NextMarker']
following_rules = conn.describe_rules(ListenerArn=http_listener_arn, PageSize=1, Marker=next_marker)
following_rules = conn.describe_rules(
ListenerArn=http_listener_arn,
PageSize=1,
Marker=next_marker)
len(following_rules['Rules']).should.equal(1)
following_rules.should.have.key('NextMarker')
following_rules['Rules'][0]['RuleArn'].should_not.equal(obtained_rules['Rules'][0]['RuleArn'])
following_rules['Rules'][0]['RuleArn'].should_not.equal(
obtained_rules['Rules'][0]['RuleArn'])
# test for invalid describe rule request
with assert_raises(ClientError):
@ -743,13 +858,13 @@ def test_handle_listener_rules():
modified_rule = conn.modify_rule(
RuleArn=first_rule['RuleArn'],
Conditions=[{
'Field': 'host-header',
'Values': [ new_host ]
},
'Field': 'host-header',
'Values': [new_host]
},
{
'Field': 'path-pattern',
'Values': [ new_path_pattern ]
}]
'Values': [new_path_pattern]
}]
)['Rules'][0]
rules = conn.describe_rules(ListenerArn=http_listener_arn)
@ -757,12 +872,14 @@ def test_handle_listener_rules():
modified_rule.should.equal(obtained_rule)
obtained_rule['Conditions'][0]['Values'][0].should.equal(new_host)
obtained_rule['Conditions'][1]['Values'][0].should.equal(new_path_pattern)
obtained_rule['Actions'][0]['TargetGroupArn'].should.equal(target_group.get('TargetGroupArn'))
obtained_rule['Actions'][0]['TargetGroupArn'].should.equal(
target_group.get('TargetGroupArn'))
# modify priority
conn.set_rule_priorities(
RulePriorities=[
{'RuleArn': first_rule['RuleArn'], 'Priority': int(first_rule['Priority']) - 1}
{'RuleArn': first_rule['RuleArn'],
'Priority': int(first_rule['Priority']) - 1}
]
)
with assert_raises(ClientError):
@ -782,16 +899,16 @@ def test_handle_listener_rules():
# test for invalid action type
safe_priority = 2
with assert_raises(ClientError):
r = conn.create_rule(
conn.create_rule(
ListenerArn=http_listener_arn,
Priority=safe_priority,
Conditions=[{
'Field': 'host-header',
'Values': [ host ]
'Values': [host]
},
{
{
'Field': 'path-pattern',
'Values': [ path_pattern ]
'Values': [path_pattern]
}],
Actions=[{
'TargetGroupArn': target_group.get('TargetGroupArn'),
@ -803,16 +920,16 @@ def test_handle_listener_rules():
safe_priority = 2
invalid_target_group_arn = target_group.get('TargetGroupArn') + 'x'
with assert_raises(ClientError):
r = conn.create_rule(
conn.create_rule(
ListenerArn=http_listener_arn,
Priority=safe_priority,
Conditions=[{
'Field': 'host-header',
'Values': [ host ]
'Values': [host]
},
{
{
'Field': 'path-pattern',
'Values': [ path_pattern ]
'Values': [path_pattern]
}],
Actions=[{
'TargetGroupArn': invalid_target_group_arn,
@ -823,12 +940,12 @@ def test_handle_listener_rules():
# test for invalid condition field_name
safe_priority = 2
with assert_raises(ClientError):
r = conn.create_rule(
conn.create_rule(
ListenerArn=http_listener_arn,
Priority=safe_priority,
Conditions=[{
'Field': 'xxxxxxx',
'Values': [ host ]
'Values': [host]
}],
Actions=[{
'TargetGroupArn': target_group.get('TargetGroupArn'),
@ -839,7 +956,7 @@ def test_handle_listener_rules():
# test for emptry condition value
safe_priority = 2
with assert_raises(ClientError):
r = conn.create_rule(
conn.create_rule(
ListenerArn=http_listener_arn,
Priority=safe_priority,
Conditions=[{
@ -855,7 +972,7 @@ def test_handle_listener_rules():
# test for multiple condition value
safe_priority = 2
with assert_raises(ClientError):
r = conn.create_rule(
conn.create_rule(
ListenerArn=http_listener_arn,
Priority=safe_priority,
Conditions=[{
@ -875,10 +992,17 @@ def test_describe_invalid_target_group():
conn = boto3.client('elbv2', region_name='us-east-1')
ec2 = boto3.resource('ec2', region_name='us-east-1')
security_group = ec2.create_security_group(GroupName='a-security-group', Description='First One')
security_group = ec2.create_security_group(
GroupName='a-security-group', Description='First One')
vpc = ec2.create_vpc(CidrBlock='172.28.7.0/24', InstanceTenancy='default')
subnet1 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(VpcId=vpc.id, CidrBlock='172.28.7.192/26', AvailabilityZone='us-east-1b')
subnet1 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1a')
subnet2 = ec2.create_subnet(
VpcId=vpc.id,
CidrBlock='172.28.7.192/26',
AvailabilityZone='us-east-1b')
response = conn.create_load_balancer(
Name='my-lb',
@ -887,7 +1011,7 @@ def test_describe_invalid_target_group():
Scheme='internal',
Tags=[{'Key': 'key_name', 'Value': 'a_value'}])
load_balancer_arn = response.get('LoadBalancers')[0].get('LoadBalancerArn')
response.get('LoadBalancers')[0].get('LoadBalancerArn')
response = conn.create_target_group(
Name='a-target',
@ -902,7 +1026,6 @@ def test_describe_invalid_target_group():
HealthyThresholdCount=5,
UnhealthyThresholdCount=2,
Matcher={'HttpCode': '200'})
target_group = response.get('TargetGroups')[0]
# Check error raises correctly
with assert_raises(ClientError):

View File

@ -82,6 +82,26 @@ def test_put_group_policy():
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
@mock_iam
def test_attach_group_policies():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_group(GroupName='my-group')
conn.list_attached_group_policies(GroupName='my-group')['AttachedPolicies'].should.be.empty
policy_arn = 'arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role'
conn.list_attached_group_policies(GroupName='my-group')['AttachedPolicies'].should.be.empty
conn.attach_group_policy(GroupName='my-group', PolicyArn=policy_arn)
conn.list_attached_group_policies(GroupName='my-group')['AttachedPolicies'].should.equal(
[
{
'PolicyName': 'AmazonElasticMapReduceforEC2Role',
'PolicyArn': policy_arn,
}
])
conn.detach_group_policy(GroupName='my-group', PolicyArn=policy_arn)
conn.list_attached_group_policies(GroupName='my-group')['AttachedPolicies'].should.be.empty
@mock_iam_deprecated()
def test_get_group_policy():
conn = boto.connect_iam()
@ -90,7 +110,8 @@ def test_get_group_policy():
conn.get_group_policy('my-group', 'my-policy')
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
policy = conn.get_group_policy('my-group', 'my-policy')
conn.get_group_policy('my-group', 'my-policy')
@mock_iam_deprecated()
def test_get_all_group_policies():
@ -107,6 +128,6 @@ def test_get_all_group_policies():
def test_list_group_policies():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_group(GroupName='my-group')
policies = conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.be.empty
conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.be.empty
conn.put_group_policy(GroupName='my-group', PolicyName='my-policy', PolicyDocument='{"some": "json"}')
policies = conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.equal(['my-policy'])
conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.equal(['my-policy'])

View File

@ -216,6 +216,33 @@ def test_create_cluster_with_security_group():
set(group_names).should.equal(set(["security_group1", "security_group2"]))
@mock_redshift
def test_create_cluster_with_security_group_boto3():
client = boto3.client('redshift', region_name='us-east-1')
client.create_cluster_security_group(
ClusterSecurityGroupName="security_group1",
Description="This is my security group",
)
client.create_cluster_security_group(
ClusterSecurityGroupName="security_group2",
Description="This is my security group",
)
cluster_identifier = 'my_cluster'
client.create_cluster(
ClusterIdentifier=cluster_identifier,
NodeType="dw.hs1.xlarge",
MasterUsername="username",
MasterUserPassword="password",
ClusterSecurityGroups=["security_group1", "security_group2"]
)
response = client.describe_clusters(ClusterIdentifier=cluster_identifier)
cluster = response['Clusters'][0]
group_names = [group['ClusterSecurityGroupName']
for group in cluster['ClusterSecurityGroups']]
set(group_names).should.equal({"security_group1", "security_group2"})
@mock_redshift_deprecated
@mock_ec2_deprecated
def test_create_cluster_with_vpc_security_groups():
@ -242,6 +269,31 @@ def test_create_cluster_with_vpc_security_groups():
list(group_ids).should.equal([security_group.id])
@mock_redshift
@mock_ec2
def test_create_cluster_with_vpc_security_groups_boto3():
ec2 = boto3.resource('ec2', region_name='us-east-1')
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
client = boto3.client('redshift', region_name='us-east-1')
cluster_id = 'my_cluster'
security_group = ec2.create_security_group(
Description="vpc_security_group",
GroupName="a group",
VpcId=vpc.id)
client.create_cluster(
ClusterIdentifier=cluster_id,
NodeType="dw.hs1.xlarge",
MasterUsername="username",
MasterUserPassword="password",
VpcSecurityGroupIds=[security_group.id],
)
response = client.describe_clusters(ClusterIdentifier=cluster_id)
cluster = response['Clusters'][0]
group_ids = [group['VpcSecurityGroupId']
for group in cluster['VpcSecurityGroups']]
list(group_ids).should.equal([security_group.id])
@mock_redshift_deprecated
def test_create_cluster_with_parameter_group():
conn = boto.connect_redshift()