Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Stephan Huber 2019-08-30 14:18:01 +02:00
parent 3020ee408a
commit b94147a1d5
14 changed files with 4499 additions and 0 deletions

365
moto/core/access_control.py Normal file
View File

@ -0,0 +1,365 @@
"""
This implementation is NOT complete, there are many things to improve.
The following is a list of the most important missing features and inaccuracies.
TODO add support for more principals, apart from IAM users and assumed IAM roles
TODO add support for the Resource and Condition parts of IAM policies
TODO add support and create tests for all services in moto (for example, API Gateway is probably not supported currently)
TODO implement service specific error messages (currently, EC2 and S3 are supported separately, everything else defaults to the errors IAM returns)
TODO include information about the action's resource in error messages (once the Resource element in IAM policies is supported)
TODO check all other actions that are performed by the action called by the user (for example, autoscaling:CreateAutoScalingGroup requires permission for iam:CreateServiceLinkedRole too - see https://docs.aws.amazon.com/autoscaling/ec2/userguide/control-access-using-iam.html)
TODO add support for resource-based policies
"""
import json
import logging
import re
from abc import abstractmethod, ABCMeta
from enum import Enum
import six
from botocore.auth import SigV4Auth, S3SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from six import string_types
from moto.iam.models import ACCOUNT_ID, Policy
from moto.iam import iam_backend
from moto.core.exceptions import SignatureDoesNotMatchError, AccessDeniedError, InvalidClientTokenIdError, AuthFailureError
from moto.s3.exceptions import (
BucketAccessDeniedError,
S3AccessDeniedError,
BucketInvalidTokenError,
S3InvalidTokenError,
S3InvalidAccessKeyIdError,
BucketInvalidAccessKeyIdError,
BucketSignatureDoesNotMatchError,
S3SignatureDoesNotMatchError
)
from moto.sts import sts_backend
log = logging.getLogger(__name__)
def create_access_key(access_key_id, headers):
if access_key_id.startswith("AKIA") or "X-Amz-Security-Token" not in headers:
return IAMUserAccessKey(access_key_id, headers)
else:
return AssumedRoleAccessKey(access_key_id, headers)
class IAMUserAccessKey(object):
def __init__(self, access_key_id, headers):
iam_users = iam_backend.list_users('/', None, None)
for iam_user in iam_users:
for access_key in iam_user.access_keys:
if access_key.access_key_id == access_key_id:
self._owner_user_name = iam_user.name
self._access_key_id = access_key_id
self._secret_access_key = access_key.secret_access_key
if "X-Amz-Security-Token" in headers:
raise CreateAccessKeyFailure(reason="InvalidToken")
return
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self):
return "arn:aws:iam::{account_id}:user/{iam_user_name}".format(
account_id=ACCOUNT_ID,
iam_user_name=self._owner_user_name
)
def create_credentials(self):
return Credentials(self._access_key_id, self._secret_access_key)
def collect_policies(self):
user_policies = []
inline_policy_names = iam_backend.list_user_policies(self._owner_user_name)
for inline_policy_name in inline_policy_names:
inline_policy = iam_backend.get_user_policy(self._owner_user_name, inline_policy_name)
user_policies.append(inline_policy)
attached_policies, _ = iam_backend.list_attached_user_policies(self._owner_user_name)
user_policies += attached_policies
user_groups = iam_backend.get_groups_for_user(self._owner_user_name)
for user_group in user_groups:
inline_group_policy_names = iam_backend.list_group_policies(user_group.name)
for inline_group_policy_name in inline_group_policy_names:
inline_user_group_policy = iam_backend.get_group_policy(user_group.name, inline_group_policy_name)
user_policies.append(inline_user_group_policy)
attached_group_policies, _ = iam_backend.list_attached_group_policies(user_group.name)
user_policies += attached_group_policies
return user_policies
class AssumedRoleAccessKey(object):
def __init__(self, access_key_id, headers):
for assumed_role in sts_backend.assumed_roles:
if assumed_role.access_key_id == access_key_id:
self._access_key_id = access_key_id
self._secret_access_key = assumed_role.secret_access_key
self._session_token = assumed_role.session_token
self._owner_role_name = assumed_role.role_arn.split("/")[-1]
self._session_name = assumed_role.session_name
if headers["X-Amz-Security-Token"] != self._session_token:
raise CreateAccessKeyFailure(reason="InvalidToken")
return
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self):
return "arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name}".format(
account_id=ACCOUNT_ID,
role_name=self._owner_role_name,
session_name=self._session_name
)
def create_credentials(self):
return Credentials(self._access_key_id, self._secret_access_key, self._session_token)
def collect_policies(self):
role_policies = []
inline_policy_names = iam_backend.list_role_policies(self._owner_role_name)
for inline_policy_name in inline_policy_names:
_, inline_policy = iam_backend.get_role_policy(self._owner_role_name, inline_policy_name)
role_policies.append(inline_policy)
attached_policies, _ = iam_backend.list_attached_role_policies(self._owner_role_name)
role_policies += attached_policies
return role_policies
class CreateAccessKeyFailure(Exception):
def __init__(self, reason, *args):
super(CreateAccessKeyFailure, self).__init__(*args)
self.reason = reason
@six.add_metaclass(ABCMeta)
class IAMRequestBase(object):
def __init__(self, method, path, data, headers):
log.debug("Creating {class_name} with method={method}, path={path}, data={data}, headers={headers}".format(
class_name=self.__class__.__name__, method=method, path=path, data=data, headers=headers))
self._method = method
self._path = path
self._data = data
self._headers = headers
credential_scope = self._get_string_between('Credential=', ',', self._headers['Authorization'])
credential_data = credential_scope.split('/')
self._region = credential_data[2]
self._service = credential_data[3]
self._action = self._service + ":" + (self._data["Action"][0] if isinstance(self._data["Action"], list) else self._data["Action"])
try:
self._access_key = create_access_key(access_key_id=credential_data[0], headers=headers)
except CreateAccessKeyFailure as e:
self._raise_invalid_access_key(e.reason)
def check_signature(self):
original_signature = self._get_string_between('Signature=', ',', self._headers['Authorization'])
calculated_signature = self._calculate_signature()
if original_signature != calculated_signature:
self._raise_signature_does_not_match()
def check_action_permitted(self):
if self._action == 'sts:GetCallerIdentity': # always allowed, even if there's an explicit Deny for it
return True
policies = self._access_key.collect_policies()
permitted = False
for policy in policies:
iam_policy = IAMPolicy(policy)
permission_result = iam_policy.is_action_permitted(self._action)
if permission_result == PermissionResult.DENIED:
self._raise_access_denied()
elif permission_result == PermissionResult.PERMITTED:
permitted = True
if not permitted:
self._raise_access_denied()
@abstractmethod
def _raise_signature_does_not_match(self):
raise NotImplementedError()
@abstractmethod
def _raise_access_denied(self):
raise NotImplementedError()
@abstractmethod
def _raise_invalid_access_key(self, reason):
raise NotImplementedError()
@abstractmethod
def _create_auth(self, credentials):
raise NotImplementedError()
@staticmethod
def _create_headers_for_aws_request(signed_headers, original_headers):
headers = {}
for key, value in original_headers.items():
if key.lower() in signed_headers:
headers[key] = value
return headers
def _create_aws_request(self):
signed_headers = self._get_string_between('SignedHeaders=', ',', self._headers['Authorization']).split(';')
headers = self._create_headers_for_aws_request(signed_headers, self._headers)
request = AWSRequest(method=self._method, url=self._path, data=self._data, headers=headers)
request.context['timestamp'] = headers['X-Amz-Date']
return request
def _calculate_signature(self):
credentials = self._access_key.create_credentials()
auth = self._create_auth(credentials)
request = self._create_aws_request()
canonical_request = auth.canonical_request(request)
string_to_sign = auth.string_to_sign(request, canonical_request)
return auth.signature(string_to_sign, request)
@staticmethod
def _get_string_between(first_separator, second_separator, string):
return string.partition(first_separator)[2].partition(second_separator)[0]
class IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self):
if self._service == "ec2":
raise AuthFailureError()
else:
raise SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, _):
if self._service == "ec2":
raise AuthFailureError()
else:
raise InvalidClientTokenIdError()
def _create_auth(self, credentials):
return SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self):
raise AccessDeniedError(
user_arn=self._access_key.arn,
action=self._action
)
class S3IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self):
if "BucketName" in self._data:
raise BucketSignatureDoesNotMatchError(bucket=self._data["BucketName"])
else:
raise S3SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, reason):
if reason == "InvalidToken":
if "BucketName" in self._data:
raise BucketInvalidTokenError(bucket=self._data["BucketName"])
else:
raise S3InvalidTokenError()
else:
if "BucketName" in self._data:
raise BucketInvalidAccessKeyIdError(bucket=self._data["BucketName"])
else:
raise S3InvalidAccessKeyIdError()
def _create_auth(self, credentials):
return S3SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self):
if "BucketName" in self._data:
raise BucketAccessDeniedError(bucket=self._data["BucketName"])
else:
raise S3AccessDeniedError()
class IAMPolicy(object):
def __init__(self, policy):
if isinstance(policy, Policy):
default_version = next(policy_version for policy_version in policy.versions if policy_version.is_default)
policy_document = default_version.document
elif isinstance(policy, string_types):
policy_document = policy
else:
policy_document = policy["policy_document"]
self._policy_json = json.loads(policy_document)
def is_action_permitted(self, action):
permitted = False
if isinstance(self._policy_json["Statement"], list):
for policy_statement in self._policy_json["Statement"]:
iam_policy_statement = IAMPolicyStatement(policy_statement)
permission_result = iam_policy_statement.is_action_permitted(action)
if permission_result == PermissionResult.DENIED:
return permission_result
elif permission_result == PermissionResult.PERMITTED:
permitted = True
else: # dict
iam_policy_statement = IAMPolicyStatement(self._policy_json["Statement"])
return iam_policy_statement.is_action_permitted(action)
if permitted:
return PermissionResult.PERMITTED
else:
return PermissionResult.NEUTRAL
class IAMPolicyStatement(object):
def __init__(self, statement):
self._statement = statement
def is_action_permitted(self, action):
is_action_concerned = False
if "NotAction" in self._statement:
if not self._check_element_matches("NotAction", action):
is_action_concerned = True
else: # Action is present
if self._check_element_matches("Action", action):
is_action_concerned = True
if is_action_concerned:
if self._statement["Effect"] == "Allow":
return PermissionResult.PERMITTED
else: # Deny
return PermissionResult.DENIED
else:
return PermissionResult.NEUTRAL
def _check_element_matches(self, statement_element, value):
if isinstance(self._statement[statement_element], list):
for statement_element_value in self._statement[statement_element]:
if self._match(statement_element_value, value):
return True
return False
else: # string
return self._match(self._statement[statement_element], value)
@staticmethod
def _match(pattern, string):
pattern = pattern.replace("*", ".*")
pattern = "^{pattern}$".format(pattern=pattern)
return re.match(pattern, string)
class PermissionResult(Enum):
PERMITTED = 1
DENIED = 2
NEUTRAL = 3

View File

@ -0,0 +1,252 @@
import six
import uuid
from moto.core.responses import BaseResponse
from moto.ec2.models import OWNER_ID
from moto.ec2.exceptions import FilterNotImplementedError
from moto.ec2.utils import filters_from_querystring
from xml.etree import ElementTree
from xml.dom import minidom
def xml_root(name):
root = ElementTree.Element(name, {
"xmlns": "http://ec2.amazonaws.com/doc/2016-11-15/"
})
request_id = str(uuid.uuid4()) + "example"
ElementTree.SubElement(root, "requestId").text = request_id
return root
def xml_serialize(tree, key, value):
name = key[0].lower() + key[1:]
if isinstance(value, list):
if name[-1] == 's':
name = name[:-1]
name = name + 'Set'
node = ElementTree.SubElement(tree, name)
if isinstance(value, (str, int, float, six.text_type)):
node.text = str(value)
elif isinstance(value, dict):
for dictkey, dictvalue in six.iteritems(value):
xml_serialize(node, dictkey, dictvalue)
elif isinstance(value, list):
for item in value:
xml_serialize(node, 'item', item)
elif value is None:
pass
else:
raise NotImplementedError("Don't know how to serialize \"{}\" to xml".format(value.__class__))
def pretty_xml(tree):
rough = ElementTree.tostring(tree, 'utf-8')
parsed = minidom.parseString(rough)
return parsed.toprettyxml(indent=' ')
def parse_object(raw_data):
out_data = {}
for key, value in six.iteritems(raw_data):
key_fix_splits = key.split("_")
key_len = len(key_fix_splits)
new_key = ""
for i in range(0, key_len):
new_key += key_fix_splits[i][0].upper() + key_fix_splits[i][1:]
data = out_data
splits = new_key.split(".")
for split in splits[:-1]:
if split not in data:
data[split] = {}
data = data[split]
data[splits[-1]] = value
out_data = parse_lists(out_data)
return out_data
def parse_lists(data):
for key, value in six.iteritems(data):
if isinstance(value, dict):
keys = data[key].keys()
is_list = all(map(lambda k: k.isnumeric(), keys))
if is_list:
new_value = []
keys = sorted(list(keys))
for k in keys:
lvalue = value[k]
if isinstance(lvalue, dict):
lvalue = parse_lists(lvalue)
new_value.append(lvalue)
data[key] = new_value
return data
class LaunchTemplates(BaseResponse):
def create_launch_template(self):
name = self._get_param('LaunchTemplateName')
version_description = self._get_param('VersionDescription')
tag_spec = self._parse_tag_specification("TagSpecification")
raw_template_data = self._get_dict_param('LaunchTemplateData.')
parsed_template_data = parse_object(raw_template_data)
if self.is_not_dryrun('CreateLaunchTemplate'):
if tag_spec:
if 'TagSpecifications' not in parsed_template_data:
parsed_template_data['TagSpecifications'] = []
converted_tag_spec = []
for resource_type, tags in six.iteritems(tag_spec):
converted_tag_spec.append({
"ResourceType": resource_type,
"Tags": [{"Key": key, "Value": value} for key, value in six.iteritems(tags)],
})
parsed_template_data['TagSpecifications'].extend(converted_tag_spec)
template = self.ec2_backend.create_launch_template(name, version_description, parsed_template_data)
version = template.default_version()
tree = xml_root("CreateLaunchTemplateResponse")
xml_serialize(tree, "launchTemplate", {
"createTime": version.create_time,
"createdBy": "arn:aws:iam::{OWNER_ID}:root".format(OWNER_ID=OWNER_ID),
"defaultVersionNumber": template.default_version_number,
"latestVersionNumber": version.number,
"launchTemplateId": template.id,
"launchTemplateName": template.name
})
return pretty_xml(tree)
def create_launch_template_version(self):
name = self._get_param('LaunchTemplateName')
tmpl_id = self._get_param('LaunchTemplateId')
if name:
template = self.ec2_backend.get_launch_template_by_name(name)
if tmpl_id:
template = self.ec2_backend.get_launch_template(tmpl_id)
version_description = self._get_param('VersionDescription')
raw_template_data = self._get_dict_param('LaunchTemplateData.')
template_data = parse_object(raw_template_data)
if self.is_not_dryrun('CreateLaunchTemplate'):
version = template.create_version(template_data, version_description)
tree = xml_root("CreateLaunchTemplateVersionResponse")
xml_serialize(tree, "launchTemplateVersion", {
"createTime": version.create_time,
"createdBy": "arn:aws:iam::{OWNER_ID}:root".format(OWNER_ID=OWNER_ID),
"defaultVersion": template.is_default(version),
"launchTemplateData": version.data,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"versionDescription": version.description,
"versionNumber": version.number,
})
return pretty_xml(tree)
# def delete_launch_template(self):
# pass
# def delete_launch_template_versions(self):
# pass
def describe_launch_template_versions(self):
name = self._get_param('LaunchTemplateName')
template_id = self._get_param('LaunchTemplateId')
if name:
template = self.ec2_backend.get_launch_template_by_name(name)
if template_id:
template = self.ec2_backend.get_launch_template(template_id)
max_results = self._get_int_param("MaxResults", 15)
versions = self._get_multi_param("LaunchTemplateVersion")
min_version = self._get_int_param("MinVersion")
max_version = self._get_int_param("MaxVersion")
filters = filters_from_querystring(self.querystring)
if filters:
raise FilterNotImplementedError("all filters", "DescribeLaunchTemplateVersions")
if self.is_not_dryrun('DescribeLaunchTemplateVersions'):
tree = ElementTree.Element("DescribeLaunchTemplateVersionsResponse", {
"xmlns": "http://ec2.amazonaws.com/doc/2016-11-15/",
})
request_id = ElementTree.SubElement(tree, "requestId")
request_id.text = "65cadec1-b364-4354-8ca8-4176dexample"
versions_node = ElementTree.SubElement(tree, "launchTemplateVersionSet")
ret_versions = []
if versions:
for v in versions:
ret_versions.append(template.get_version(int(v)))
elif min_version:
if max_version:
vMax = max_version
else:
vMax = min_version + max_results
vMin = min_version - 1
ret_versions = template.versions[vMin:vMax]
elif max_version:
vMax = max_version
ret_versions = template.versions[:vMax]
else:
ret_versions = template.versions
ret_versions = ret_versions[:max_results]
for version in ret_versions:
xml_serialize(versions_node, "item", {
"createTime": version.create_time,
"createdBy": "arn:aws:iam::{OWNER_ID}:root".format(OWNER_ID=OWNER_ID),
"defaultVersion": True,
"launchTemplateData": version.data,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"versionDescription": version.description,
"versionNumber": version.number,
})
return pretty_xml(tree)
def describe_launch_templates(self):
max_results = self._get_int_param("MaxResults", 15)
template_names = self._get_multi_param("LaunchTemplateName")
template_ids = self._get_multi_param("LaunchTemplateId")
filters = filters_from_querystring(self.querystring)
if self.is_not_dryrun("DescribeLaunchTemplates"):
tree = ElementTree.Element("DescribeLaunchTemplatesResponse")
templates_node = ElementTree.SubElement(tree, "launchTemplates")
templates = self.ec2_backend.get_launch_templates(template_names=template_names, template_ids=template_ids, filters=filters)
templates = templates[:max_results]
for template in templates:
xml_serialize(templates_node, "item", {
"createTime": template.create_time,
"createdBy": "arn:aws:iam::{OWNER_ID}:root".format(OWNER_ID=OWNER_ID),
"defaultVersionNumber": template.default_version_number,
"latestVersionNumber": template.latest_version_number,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
})
return pretty_xml(tree)
# def modify_launch_template(self):
# pass

View File

@ -0,0 +1,450 @@
import json
import re
from six import string_types
from moto.iam.exceptions import MalformedPolicyDocument
VALID_TOP_ELEMENTS = [
"Version",
"Id",
"Statement",
"Conditions"
]
VALID_VERSIONS = [
"2008-10-17",
"2012-10-17"
]
VALID_STATEMENT_ELEMENTS = [
"Sid",
"Action",
"NotAction",
"Resource",
"NotResource",
"Effect",
"Condition"
]
VALID_EFFECTS = [
"Allow",
"Deny"
]
VALID_CONDITIONS = [
"StringEquals",
"StringNotEquals",
"StringEqualsIgnoreCase",
"StringNotEqualsIgnoreCase",
"StringLike",
"StringNotLike",
"NumericEquals",
"NumericNotEquals",
"NumericLessThan",
"NumericLessThanEquals",
"NumericGreaterThan",
"NumericGreaterThanEquals",
"DateEquals",
"DateNotEquals",
"DateLessThan",
"DateLessThanEquals",
"DateGreaterThan",
"DateGreaterThanEquals",
"Bool",
"BinaryEquals",
"IpAddress",
"NotIpAddress",
"ArnEquals",
"ArnLike",
"ArnNotEquals",
"ArnNotLike",
"Null"
]
VALID_CONDITION_PREFIXES = [
"ForAnyValue:",
"ForAllValues:"
]
VALID_CONDITION_POSTFIXES = [
"IfExists"
]
SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS = {
"iam": 'IAM resource {resource} cannot contain region information.',
"s3": 'Resource {resource} can not contain region information.'
}
VALID_RESOURCE_PATH_STARTING_VALUES = {
"iam": {
"values": ["user/", "federated-user/", "role/", "group/", "instance-profile/", "mfa/", "server-certificate/",
"policy/", "sms-mfa/", "saml-provider/", "oidc-provider/", "report/", "access-report/"],
"error_message": 'IAM resource path must either be "*" or start with {values}.'
}
}
class IAMPolicyDocumentValidator:
def __init__(self, policy_document):
self._policy_document = policy_document
self._policy_json = {}
self._statements = []
self._resource_error = "" # the first resource error found that does not generate a legacy parsing error
def validate(self):
try:
self._validate_syntax()
except Exception:
raise MalformedPolicyDocument("Syntax errors in policy.")
try:
self._validate_version()
except Exception:
raise MalformedPolicyDocument("Policy document must be version 2012-10-17 or greater.")
try:
self._perform_first_legacy_parsing()
self._validate_resources_for_formats()
self._validate_not_resources_for_formats()
except Exception:
raise MalformedPolicyDocument("The policy failed legacy parsing")
try:
self._validate_sid_uniqueness()
except Exception:
raise MalformedPolicyDocument("Statement IDs (SID) in a single policy must be unique.")
try:
self._validate_action_like_exist()
except Exception:
raise MalformedPolicyDocument("Policy statement must contain actions.")
try:
self._validate_resource_exist()
except Exception:
raise MalformedPolicyDocument("Policy statement must contain resources.")
if self._resource_error != "":
raise MalformedPolicyDocument(self._resource_error)
self._validate_actions_for_prefixes()
self._validate_not_actions_for_prefixes()
def _validate_syntax(self):
self._policy_json = json.loads(self._policy_document)
assert isinstance(self._policy_json, dict)
self._validate_top_elements()
self._validate_version_syntax()
self._validate_id_syntax()
self._validate_statements_syntax()
def _validate_top_elements(self):
top_elements = self._policy_json.keys()
for element in top_elements:
assert element in VALID_TOP_ELEMENTS
def _validate_version_syntax(self):
if "Version" in self._policy_json:
assert self._policy_json["Version"] in VALID_VERSIONS
def _validate_version(self):
assert self._policy_json["Version"] == "2012-10-17"
def _validate_sid_uniqueness(self):
sids = []
for statement in self._statements:
if "Sid" in statement:
assert statement["Sid"] not in sids
sids.append(statement["Sid"])
def _validate_statements_syntax(self):
assert "Statement" in self._policy_json
assert isinstance(self._policy_json["Statement"], (dict, list))
if isinstance(self._policy_json["Statement"], dict):
self._statements.append(self._policy_json["Statement"])
else:
self._statements += self._policy_json["Statement"]
assert self._statements
for statement in self._statements:
self._validate_statement_syntax(statement)
@staticmethod
def _validate_statement_syntax(statement):
assert isinstance(statement, dict)
for statement_element in statement.keys():
assert statement_element in VALID_STATEMENT_ELEMENTS
assert ("Resource" not in statement or "NotResource" not in statement)
assert ("Action" not in statement or "NotAction" not in statement)
IAMPolicyDocumentValidator._validate_effect_syntax(statement)
IAMPolicyDocumentValidator._validate_action_syntax(statement)
IAMPolicyDocumentValidator._validate_not_action_syntax(statement)
IAMPolicyDocumentValidator._validate_resource_syntax(statement)
IAMPolicyDocumentValidator._validate_not_resource_syntax(statement)
IAMPolicyDocumentValidator._validate_condition_syntax(statement)
IAMPolicyDocumentValidator._validate_sid_syntax(statement)
@staticmethod
def _validate_effect_syntax(statement):
assert "Effect" in statement
assert isinstance(statement["Effect"], string_types)
assert statement["Effect"].lower() in [allowed_effect.lower() for allowed_effect in VALID_EFFECTS]
@staticmethod
def _validate_action_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "Action")
@staticmethod
def _validate_not_action_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "NotAction")
@staticmethod
def _validate_resource_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "Resource")
@staticmethod
def _validate_not_resource_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "NotResource")
@staticmethod
def _validate_string_or_list_of_strings_syntax(statement, key):
if key in statement:
assert isinstance(statement[key], (string_types, list))
if isinstance(statement[key], list):
for resource in statement[key]:
assert isinstance(resource, string_types)
@staticmethod
def _validate_condition_syntax(statement):
if "Condition" in statement:
assert isinstance(statement["Condition"], dict)
for condition_key, condition_value in statement["Condition"].items():
assert isinstance(condition_value, dict)
for condition_element_key, condition_element_value in condition_value.items():
assert isinstance(condition_element_value, (list, string_types))
if IAMPolicyDocumentValidator._strip_condition_key(condition_key) not in VALID_CONDITIONS:
assert not condition_value # empty dict
@staticmethod
def _strip_condition_key(condition_key):
for valid_prefix in VALID_CONDITION_PREFIXES:
if condition_key.startswith(valid_prefix):
condition_key = condition_key[len(valid_prefix):]
break # strip only the first match
for valid_postfix in VALID_CONDITION_POSTFIXES:
if condition_key.endswith(valid_postfix):
condition_key = condition_key[:-len(valid_postfix)]
break # strip only the first match
return condition_key
@staticmethod
def _validate_sid_syntax(statement):
if "Sid" in statement:
assert isinstance(statement["Sid"], string_types)
def _validate_id_syntax(self):
if "Id" in self._policy_json:
assert isinstance(self._policy_json["Id"], string_types)
def _validate_resource_exist(self):
for statement in self._statements:
assert ("Resource" in statement or "NotResource" in statement)
if "Resource" in statement and isinstance(statement["Resource"], list):
assert statement["Resource"]
elif "NotResource" in statement and isinstance(statement["NotResource"], list):
assert statement["NotResource"]
def _validate_action_like_exist(self):
for statement in self._statements:
assert ("Action" in statement or "NotAction" in statement)
if "Action" in statement and isinstance(statement["Action"], list):
assert statement["Action"]
elif "NotAction" in statement and isinstance(statement["NotAction"], list):
assert statement["NotAction"]
def _validate_actions_for_prefixes(self):
self._validate_action_like_for_prefixes("Action")
def _validate_not_actions_for_prefixes(self):
self._validate_action_like_for_prefixes("NotAction")
def _validate_action_like_for_prefixes(self, key):
for statement in self._statements:
if key in statement:
if isinstance(statement[key], string_types):
self._validate_action_prefix(statement[key])
else:
for action in statement[key]:
self._validate_action_prefix(action)
@staticmethod
def _validate_action_prefix(action):
action_parts = action.split(":")
if len(action_parts) == 1 and action_parts[0] != "*":
raise MalformedPolicyDocument("Actions/Conditions must be prefaced by a vendor, e.g., iam, sdb, ec2, etc.")
elif len(action_parts) > 2:
raise MalformedPolicyDocument("Actions/Condition can contain only one colon.")
vendor_pattern = re.compile(r'[^a-zA-Z0-9\-.]')
if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]):
raise MalformedPolicyDocument("Vendor {vendor} is not valid".format(vendor=action_parts[0]))
def _validate_resources_for_formats(self):
self._validate_resource_like_for_formats("Resource")
def _validate_not_resources_for_formats(self):
self._validate_resource_like_for_formats("NotResource")
def _validate_resource_like_for_formats(self, key):
for statement in self._statements:
if key in statement:
if isinstance(statement[key], string_types):
self._validate_resource_format(statement[key])
else:
for resource in sorted(statement[key], reverse=True):
self._validate_resource_format(resource)
if self._resource_error == "":
IAMPolicyDocumentValidator._legacy_parse_resource_like(statement, key)
def _validate_resource_format(self, resource):
if resource != "*":
resource_partitions = resource.partition(":")
if resource_partitions[1] == "":
self._resource_error = 'Resource {resource} must be in ARN format or "*".'.format(resource=resource)
return
resource_partitions = resource_partitions[2].partition(":")
if resource_partitions[0] != "aws":
remaining_resource_parts = resource_partitions[2].split(":")
arn1 = remaining_resource_parts[0] if remaining_resource_parts[0] != "" or len(remaining_resource_parts) > 1 else "*"
arn2 = remaining_resource_parts[1] if len(remaining_resource_parts) > 1 else "*"
arn3 = remaining_resource_parts[2] if len(remaining_resource_parts) > 2 else "*"
arn4 = ":".join(remaining_resource_parts[3:]) if len(remaining_resource_parts) > 3 else "*"
self._resource_error = 'Partition "{partition}" is not valid for resource "arn:{partition}:{arn1}:{arn2}:{arn3}:{arn4}".'.format(
partition=resource_partitions[0],
arn1=arn1,
arn2=arn2,
arn3=arn3,
arn4=arn4
)
return
if resource_partitions[1] != ":":
self._resource_error = "Resource vendor must be fully qualified and cannot contain regexes."
return
resource_partitions = resource_partitions[2].partition(":")
service = resource_partitions[0]
if service in SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS.keys() and not resource_partitions[2].startswith(":"):
self._resource_error = SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS[service].format(resource=resource)
return
resource_partitions = resource_partitions[2].partition(":")
resource_partitions = resource_partitions[2].partition(":")
if service in VALID_RESOURCE_PATH_STARTING_VALUES.keys():
valid_start = False
for valid_starting_value in VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"]:
if resource_partitions[2].startswith(valid_starting_value):
valid_start = True
break
if not valid_start:
self._resource_error = VALID_RESOURCE_PATH_STARTING_VALUES[service]["error_message"].format(
values=", ".join(VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"])
)
def _perform_first_legacy_parsing(self):
"""This method excludes legacy parsing resources, since that have to be done later."""
for statement in self._statements:
self._legacy_parse_statement(statement)
@staticmethod
def _legacy_parse_statement(statement):
assert statement["Effect"] in VALID_EFFECTS # case-sensitive matching
if "Condition" in statement:
for condition_key, condition_value in statement["Condition"].items():
IAMPolicyDocumentValidator._legacy_parse_condition(condition_key, condition_value)
@staticmethod
def _legacy_parse_resource_like(statement, key):
if isinstance(statement[key], string_types):
if statement[key] != "*":
assert statement[key].count(":") >= 5 or "::" not in statement[key]
assert statement[key].split(":")[2] != ""
else: # list
for resource in statement[key]:
if resource != "*":
assert resource.count(":") >= 5 or "::" not in resource
assert resource[2] != ""
@staticmethod
def _legacy_parse_condition(condition_key, condition_value):
stripped_condition_key = IAMPolicyDocumentValidator._strip_condition_key(condition_key)
if stripped_condition_key.startswith("Date"):
for condition_element_key, condition_element_value in condition_value.items():
if isinstance(condition_element_value, string_types):
IAMPolicyDocumentValidator._legacy_parse_date_condition_value(condition_element_value)
else: # it has to be a list
for date_condition_value in condition_element_value:
IAMPolicyDocumentValidator._legacy_parse_date_condition_value(date_condition_value)
@staticmethod
def _legacy_parse_date_condition_value(date_condition_value):
if "t" in date_condition_value.lower() or "-" in date_condition_value:
IAMPolicyDocumentValidator._validate_iso_8601_datetime(date_condition_value.lower())
else: # timestamp
assert 0 <= int(date_condition_value) <= 9223372036854775807
@staticmethod
def _validate_iso_8601_datetime(datetime):
datetime_parts = datetime.partition("t")
negative_year = datetime_parts[0].startswith("-")
date_parts = datetime_parts[0][1:].split("-") if negative_year else datetime_parts[0].split("-")
year = "-" + date_parts[0] if negative_year else date_parts[0]
assert -292275054 <= int(year) <= 292278993
if len(date_parts) > 1:
month = date_parts[1]
assert 1 <= int(month) <= 12
if len(date_parts) > 2:
day = date_parts[2]
assert 1 <= int(day) <= 31
assert len(date_parts) < 4
time_parts = datetime_parts[2].split(":")
if time_parts[0] != "":
hours = time_parts[0]
assert 0 <= int(hours) <= 23
if len(time_parts) > 1:
minutes = time_parts[1]
assert 0 <= int(minutes) <= 59
if len(time_parts) > 2:
if "z" in time_parts[2]:
seconds_with_decimal_fraction = time_parts[2].partition("z")[0]
assert time_parts[2].partition("z")[2] == ""
elif "+" in time_parts[2]:
seconds_with_decimal_fraction = time_parts[2].partition("+")[0]
time_zone_data = time_parts[2].partition("+")[2].partition(":")
time_zone_hours = time_zone_data[0]
assert len(time_zone_hours) == 2
assert 0 <= int(time_zone_hours) <= 23
if time_zone_data[1] == ":":
time_zone_minutes = time_zone_data[2]
assert len(time_zone_minutes) == 2
assert 0 <= int(time_zone_minutes) <= 59
else:
seconds_with_decimal_fraction = time_parts[2]
seconds_with_decimal_fraction_partition = seconds_with_decimal_fraction.partition(".")
seconds = seconds_with_decimal_fraction_partition[0]
assert 0 <= int(seconds) <= 59
if seconds_with_decimal_fraction_partition[1] == ".":
decimal_seconds = seconds_with_decimal_fraction_partition[2]
assert 0 <= int(decimal_seconds) <= 999999999

81
moto/ses/feedback.py Normal file
View File

@ -0,0 +1,81 @@
"""
SES Feedback messages
Extracted from https://docs.aws.amazon.com/ses/latest/DeveloperGuide/notification-contents.html
"""
COMMON_MAIL = {
"notificationType": "Bounce, Complaint, or Delivery.",
"mail": {
"timestamp": "2018-10-08T14:05:45 +0000",
"messageId": "000001378603177f-7a5433e7-8edb-42ae-af10-f0181f34d6ee-000000",
"source": "sender@example.com",
"sourceArn": "arn:aws:ses:us-west-2:888888888888:identity/example.com",
"sourceIp": "127.0.3.0",
"sendingAccountId": "123456789012",
"destination": [
"recipient@example.com"
],
"headersTruncated": False,
"headers": [
{
"name": "From",
"value": "\"Sender Name\" <sender@example.com>"
},
{
"name": "To",
"value": "\"Recipient Name\" <recipient@example.com>"
}
],
"commonHeaders": {
"from": [
"Sender Name <sender@example.com>"
],
"date": "Mon, 08 Oct 2018 14:05:45 +0000",
"to": [
"Recipient Name <recipient@example.com>"
],
"messageId": " custom-message-ID",
"subject": "Message sent using Amazon SES"
}
}
}
BOUNCE = {
"bounceType": "Permanent",
"bounceSubType": "General",
"bouncedRecipients": [
{
"status": "5.0.0",
"action": "failed",
"diagnosticCode": "smtp; 550 user unknown",
"emailAddress": "recipient1@example.com"
},
{
"status": "4.0.0",
"action": "delayed",
"emailAddress": "recipient2@example.com"
}
],
"reportingMTA": "example.com",
"timestamp": "2012-05-25T14:59:38.605Z",
"feedbackId": "000001378603176d-5a4b5ad9-6f30-4198-a8c3-b1eb0c270a1d-000000",
"remoteMtaIp": "127.0.2.0"
}
COMPLAINT = {
"userAgent": "AnyCompany Feedback Loop (V0.01)",
"complainedRecipients": [
{
"emailAddress": "recipient1@example.com"
}
],
"complaintFeedbackType": "abuse",
"arrivalDate": "2009-12-03T04:24:21.000-05:00",
"timestamp": "2012-05-25T14:59:38.623Z",
"feedbackId": "000001378603177f-18c07c78-fa81-4a58-9dd1-fedc3cb8f49a-000000"
}
DELIVERY = {
"timestamp": "2014-05-28T22:41:01.184Z",
"processingTimeMillis": 546,
"recipients": ["success@simulator.amazonses.com"],
"smtpResponse": "250 ok: Message 64111812 accepted",
"reportingMTA": "a8-70.smtp-out.amazonses.com",
"remoteMtaIp": "127.0.2.0"
}

15
moto/sts/exceptions.py Normal file
View File

@ -0,0 +1,15 @@
from __future__ import unicode_literals
from moto.core.exceptions import RESTError
class STSClientError(RESTError):
code = 400
class STSValidationError(STSClientError):
def __init__(self, *args, **kwargs):
super(STSValidationError, self).__init__(
"ValidationError",
*args, **kwargs
)

35
moto/sts/utils.py Normal file
View File

@ -0,0 +1,35 @@
import base64
import os
import random
import string
import six
ACCOUNT_SPECIFIC_ACCESS_KEY_PREFIX = "8NWMTLYQ"
ACCOUNT_SPECIFIC_ASSUMED_ROLE_ID_PREFIX = "3X42LBCD"
SESSION_TOKEN_PREFIX = "FQoGZXIvYXdzEBYaD"
def random_access_key_id():
return ACCOUNT_SPECIFIC_ACCESS_KEY_PREFIX + _random_uppercase_or_digit_sequence(8)
def random_secret_access_key():
return base64.b64encode(os.urandom(30)).decode()
def random_session_token():
return SESSION_TOKEN_PREFIX + base64.b64encode(os.urandom(266))[len(SESSION_TOKEN_PREFIX):].decode()
def random_assumed_role_id():
return ACCOUNT_SPECIFIC_ASSUMED_ROLE_ID_PREFIX + _random_uppercase_or_digit_sequence(9)
def _random_uppercase_or_digit_sequence(length):
return ''.join(
six.text_type(
random.choice(
string.ascii_uppercase + string.digits
)) for _ in range(length)
)

View File

@ -0,0 +1,25 @@
package com.amazonaws.examples
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.{Region, Regions}
import com.amazonaws.services.sqs.AmazonSQSClientBuilder
import scala.jdk.CollectionConverters._
object QueueTest extends App {
val region = Region.getRegion(Regions.US_WEST_2).getName
val serviceEndpoint = "http://localhost:5000"
val amazonSqs = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region))
.build
val queueName = "my-first-queue"
amazonSqs.createQueue(queueName)
val urls = amazonSqs.listQueues().getQueueUrls.asScala
println("Listing queues")
println(urls.map(url => s" - $url").mkString(System.lineSeparator))
println()
}

View File

@ -0,0 +1,706 @@
import json
import boto3
import sure # noqa
from botocore.exceptions import ClientError
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
from moto import mock_iam, mock_ec2, mock_s3, mock_sts, mock_elbv2, mock_rds2
from moto.core import set_initial_no_auth_action_count
from moto.iam.models import ACCOUNT_ID
@mock_iam
def create_user_with_access_key(user_name='test-user'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
return client.create_access_key(UserName=user_name)['AccessKey']
@mock_iam
def create_user_with_access_key_and_inline_policy(user_name, policy_document, policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
client.put_user_policy(UserName=user_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document))
return client.create_access_key(UserName=user_name)['AccessKey']
@mock_iam
def create_user_with_access_key_and_attached_policy(user_name, policy_document, policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
policy_arn = client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)['Policy']['Arn']
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
return client.create_access_key(UserName=user_name)['AccessKey']
@mock_iam
def create_user_with_access_key_and_multiple_policies(user_name, inline_policy_document,
attached_policy_document, inline_policy_name='policy1', attached_policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
policy_arn = client.create_policy(
PolicyName=attached_policy_name,
PolicyDocument=json.dumps(attached_policy_document)
)['Policy']['Arn']
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
client.put_user_policy(UserName=user_name, PolicyName=inline_policy_name, PolicyDocument=json.dumps(inline_policy_document))
return client.create_access_key(UserName=user_name)['AccessKey']
def create_group_with_attached_policy_and_add_user(user_name, policy_document,
group_name='test-group', policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_group(GroupName=group_name)
policy_arn = client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)['Policy']['Arn']
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
def create_group_with_inline_policy_and_add_user(user_name, policy_document,
group_name='test-group', policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_group(GroupName=group_name)
client.put_group_policy(
GroupName=group_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
def create_group_with_multiple_policies_and_add_user(user_name, inline_policy_document,
attached_policy_document, group_name='test-group',
inline_policy_name='policy1', attached_policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_group(GroupName=group_name)
client.put_group_policy(
GroupName=group_name,
PolicyName=inline_policy_name,
PolicyDocument=json.dumps(inline_policy_document)
)
policy_arn = client.create_policy(
PolicyName=attached_policy_name,
PolicyDocument=json.dumps(attached_policy_document)
)['Policy']['Arn']
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
@mock_iam
@mock_sts
def create_role_with_attached_policy_and_assume_it(role_name, trust_policy_document,
policy_document, session_name='session1', policy_name='policy1'):
iam_client = boto3.client('iam', region_name='us-east-1')
sts_client = boto3.client('sts', region_name='us-east-1')
role_arn = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)['Role']['Arn']
policy_arn = iam_client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)['Policy']['Arn']
iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)['Credentials']
@mock_iam
@mock_sts
def create_role_with_inline_policy_and_assume_it(role_name, trust_policy_document,
policy_document, session_name='session1', policy_name='policy1'):
iam_client = boto3.client('iam', region_name='us-east-1')
sts_client = boto3.client('sts', region_name='us-east-1')
role_arn = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)['Role']['Arn']
iam_client.put_role_policy(
RoleName=role_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)['Credentials']
@set_initial_no_auth_action_count(0)
@mock_iam
def test_invalid_client_token_id():
client = boto3.client('iam', region_name='us-east-1', aws_access_key_id='invalid', aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.get_user()
ex.exception.response['Error']['Code'].should.equal('InvalidClientTokenId')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The security token included in the request is invalid.')
@set_initial_no_auth_action_count(0)
@mock_ec2
def test_auth_failure():
client = boto3.client('ec2', region_name='us-east-1', aws_access_key_id='invalid', aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.describe_instances()
ex.exception.response['Error']['Code'].should.equal('AuthFailure')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(401)
ex.exception.response['Error']['Message'].should.equal('AWS was not able to validate the provided access credentials')
@set_initial_no_auth_action_count(2)
@mock_iam
def test_signature_does_not_match():
access_key = create_user_with_access_key()
client = boto3.client('iam', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.get_user()
ex.exception.response['Error']['Code'].should.equal('SignatureDoesNotMatch')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.')
@set_initial_no_auth_action_count(2)
@mock_ec2
def test_auth_failure_with_valid_access_key_id():
access_key = create_user_with_access_key()
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.describe_instances()
ex.exception.response['Error']['Code'].should.equal('AuthFailure')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(401)
ex.exception.response['Error']['Message'].should.equal('AWS was not able to validate the provided access credentials')
@set_initial_no_auth_action_count(2)
@mock_ec2
def test_access_denied_with_no_policy():
user_name = 'test-user'
access_key = create_user_with_access_key(user_name)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.describe_instances()
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:DescribeInstances"
)
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_access_denied_with_not_allowing_policy():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:Describe*"
],
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.run_instances(MaxCount=1, MinCount=1)
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:RunInstances"
)
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_access_denied_with_denying_policy():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:*",
],
"Resource": "*"
},
{
"Effect": "Deny",
"Action": "ec2:CreateVpc",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.create_vpc(CidrBlock="10.0.0.0/16")
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:CreateVpc"
)
)
@set_initial_no_auth_action_count(3)
@mock_sts
def test_get_caller_identity_allowed_with_denying_policy():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "sts:GetCallerIdentity",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('sts', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.get_caller_identity().should.be.a(dict)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_allowed_with_wildcard_action():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:Describe*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.describe_tags()['Tags'].should.be.empty
@set_initial_no_auth_action_count(4)
@mock_iam
def test_allowed_with_explicit_action_in_attached_policy():
user_name = 'test-user'
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:ListGroups",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_attached_policy(user_name, attached_policy_document)
client = boto3.client('iam', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.list_groups()['Groups'].should.be.empty
@set_initial_no_auth_action_count(8)
@mock_s3
@mock_iam
def test_s3_access_denied_with_denying_attached_group_policy():
user_name = 'test-user'
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:ListAllMyBuckets",
"Resource": "*"
}
]
}
group_attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "s3:List*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_attached_policy(user_name, attached_policy_document)
create_group_with_attached_policy_and_add_user(user_name, group_attached_policy_document)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.list_buckets()
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('Access Denied')
@set_initial_no_auth_action_count(6)
@mock_s3
@mock_iam
def test_s3_access_denied_with_denying_inline_group_policy():
user_name = 'test-user'
bucket_name = 'test-bucket'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Resource": "*"
}
]
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "s3:GetObject",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
create_group_with_inline_policy_and_add_user(user_name, group_inline_policy_document)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.get_object(Bucket=bucket_name, Key='sdfsdf')
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('Access Denied')
@set_initial_no_auth_action_count(10)
@mock_iam
@mock_ec2
def test_access_denied_with_many_irrelevant_policies():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:Describe*",
"Resource": "*"
}
]
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
}
]
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "iam:List*",
"Resource": "*"
}
]
}
group_attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "lambda:*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_multiple_policies(user_name, inline_policy_document,
attached_policy_document)
create_group_with_multiple_policies_and_add_user(user_name, group_inline_policy_document,
group_attached_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.create_key_pair(KeyName="TestKey")
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:CreateKeyPair"
)
)
@set_initial_no_auth_action_count(4)
@mock_iam
@mock_sts
@mock_ec2
@mock_elbv2
def test_allowed_with_temporary_credentials():
role_name = 'test-role'
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
"Action": "sts:AssumeRole"
}
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticloadbalancing:CreateLoadBalancer",
"ec2:DescribeSubnets"
],
"Resource": "*"
}
]
}
credentials = create_role_with_attached_policy_and_assume_it(role_name, trust_policy_document, attached_policy_document)
elbv2_client = boto3.client('elbv2', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
ec2_client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
subnets = ec2_client.describe_subnets()['Subnets']
len(subnets).should.be.greater_than(1)
elbv2_client.create_load_balancer(
Name='test-load-balancer',
Subnets=[
subnets[0]['SubnetId'],
subnets[1]['SubnetId']
]
)['LoadBalancers'].should.have.length_of(1)
@set_initial_no_auth_action_count(3)
@mock_iam
@mock_sts
@mock_rds2
def test_access_denied_with_temporary_credentials():
role_name = 'test-role'
session_name = 'test-session'
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
"Action": "sts:AssumeRole"
}
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
'rds:Describe*'
],
"Resource": "*"
}
]
}
credentials = create_role_with_inline_policy_and_assume_it(role_name, trust_policy_document,
attached_policy_document, session_name)
client = boto3.client('rds', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
with assert_raises(ClientError) as ex:
client.create_db_instance(
DBInstanceIdentifier='test-db-instance',
DBInstanceClass='db.t3',
Engine='aurora-postgresql'
)
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
role_name=role_name,
session_name=session_name,
operation="rds:CreateDBInstance"
)
)
@set_initial_no_auth_action_count(3)
@mock_iam
def test_get_user_from_credentials():
user_name = 'new-test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('iam', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.get_user()['User']['UserName'].should.equal(user_name)
@set_initial_no_auth_action_count(0)
@mock_s3
def test_s3_invalid_access_key_id():
client = boto3.client('s3', region_name='us-east-1', aws_access_key_id='invalid', aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.list_buckets()
ex.exception.response['Error']['Code'].should.equal('InvalidAccessKeyId')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The AWS Access Key Id you provided does not exist in our records.')
@set_initial_no_auth_action_count(3)
@mock_s3
@mock_iam
def test_s3_signature_does_not_match():
bucket_name = 'test-bucket'
access_key = create_user_with_access_key()
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key='invalid')
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.put_object(Bucket=bucket_name, Key="abc")
ex.exception.response['Error']['Code'].should.equal('SignatureDoesNotMatch')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The request signature we calculated does not match the signature you provided. Check your key and signing method.')
@set_initial_no_auth_action_count(7)
@mock_s3
@mock_iam
def test_s3_access_denied_not_action():
user_name = 'test-user'
bucket_name = 'test-bucket'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Resource": "*"
}
]
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"NotAction": "iam:GetUser",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
create_group_with_inline_policy_and_add_user(user_name, group_inline_policy_document)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.delete_object(Bucket=bucket_name, Key='sdfsdf')
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('Access Denied')
@set_initial_no_auth_action_count(4)
@mock_iam
@mock_sts
@mock_s3
def test_s3_invalid_token_with_temporary_credentials():
role_name = 'test-role'
session_name = 'test-session'
bucket_name = 'test-bucket-888'
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
"Action": "sts:AssumeRole"
}
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
'*'
],
"Resource": "*"
}
]
}
credentials = create_role_with_inline_policy_and_assume_it(role_name, trust_policy_document,
attached_policy_document, session_name)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token='invalid')
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.list_bucket_metrics_configurations(Bucket=bucket_name)
ex.exception.response['Error']['Code'].should.equal('InvalidToken')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Message'].should.equal('The provided token is malformed or otherwise invalid.')

View File

@ -0,0 +1,12 @@
import sure # noqa
import boto3
from moto import mock_sqs, settings
def test_context_manager_returns_mock():
with mock_sqs() as sqs_mock:
conn = boto3.client("sqs", region_name='us-west-1')
conn.create_queue(QueueName="queue1")
if not settings.TEST_SERVER_MODE:
list(sqs_mock.backends['us-west-1'].queues.keys()).should.equal(['queue1'])

View File

@ -0,0 +1,48 @@
import unittest
from moto import mock_dynamodb2_deprecated, mock_dynamodb2
import socket
from six import PY3
class TestSocketPair(unittest.TestCase):
@mock_dynamodb2_deprecated
def test_asyncio_deprecated(self):
if PY3:
self.assertIn(
'moto.packages.httpretty.core.fakesock.socket',
str(socket.socket),
'Our mock should be present'
)
import asyncio
self.assertIsNotNone(asyncio.get_event_loop())
@mock_dynamodb2_deprecated
def test_socket_pair_deprecated(self):
# In Python2, the fakesocket is not set, for some reason.
if PY3:
self.assertIn(
'moto.packages.httpretty.core.fakesock.socket',
str(socket.socket),
'Our mock should be present'
)
a, b = socket.socketpair()
self.assertIsNotNone(a)
self.assertIsNotNone(b)
if a:
a.close()
if b:
b.close()
@mock_dynamodb2
def test_socket_pair(self):
a, b = socket.socketpair()
self.assertIsNotNone(a)
self.assertIsNotNone(b)
if a:
a.close()
if b:
b.close()

View File

@ -0,0 +1,415 @@
import boto3
import sure # noqa
from nose.tools import assert_raises
from botocore.client import ClientError
from moto import mock_ec2
@mock_ec2
def test_launch_template_create():
cli = boto3.client("ec2", region_name="us-east-1")
resp = cli.create_launch_template(
LaunchTemplateName="test-template",
# the absolute minimum needed to create a template without other resources
LaunchTemplateData={
"TagSpecifications": [{
"ResourceType": "instance",
"Tags": [{
"Key": "test",
"Value": "value",
}],
}],
},
)
resp.should.have.key("LaunchTemplate")
lt = resp["LaunchTemplate"]
lt["LaunchTemplateName"].should.equal("test-template")
lt["DefaultVersionNumber"].should.equal(1)
lt["LatestVersionNumber"].should.equal(1)
with assert_raises(ClientError) as ex:
cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"TagSpecifications": [{
"ResourceType": "instance",
"Tags": [{
"Key": "test",
"Value": "value",
}],
}],
},
)
str(ex.exception).should.equal(
'An error occurred (InvalidLaunchTemplateName.AlreadyExistsException) when calling the CreateLaunchTemplate operation: Launch template name already in use.')
@mock_ec2
def test_describe_launch_template_versions():
template_data = {
"ImageId": "ami-abc123",
"DisableApiTermination": False,
"TagSpecifications": [{
"ResourceType": "instance",
"Tags": [{
"Key": "test",
"Value": "value",
}],
}],
"SecurityGroupIds": [
"sg-1234",
"sg-ab5678",
],
}
cli = boto3.client("ec2", region_name="us-east-1")
create_resp = cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData=template_data)
# test using name
resp = cli.describe_launch_template_versions(
LaunchTemplateName="test-template",
Versions=['1'])
templ = resp["LaunchTemplateVersions"][0]["LaunchTemplateData"]
templ.should.equal(template_data)
# test using id
resp = cli.describe_launch_template_versions(
LaunchTemplateId=create_resp["LaunchTemplate"]["LaunchTemplateId"],
Versions=['1'])
templ = resp["LaunchTemplateVersions"][0]["LaunchTemplateData"]
templ.should.equal(template_data)
@mock_ec2
def test_create_launch_template_version():
cli = boto3.client("ec2", region_name="us-east-1")
create_resp = cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
version_resp = cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-def456"
},
VersionDescription="new ami")
version_resp.should.have.key("LaunchTemplateVersion")
version = version_resp["LaunchTemplateVersion"]
version["DefaultVersion"].should.equal(False)
version["LaunchTemplateId"].should.equal(create_resp["LaunchTemplate"]["LaunchTemplateId"])
version["VersionDescription"].should.equal("new ami")
version["VersionNumber"].should.equal(2)
@mock_ec2
def test_create_launch_template_version_by_id():
cli = boto3.client("ec2", region_name="us-east-1")
create_resp = cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
version_resp = cli.create_launch_template_version(
LaunchTemplateId=create_resp["LaunchTemplate"]["LaunchTemplateId"],
LaunchTemplateData={
"ImageId": "ami-def456"
},
VersionDescription="new ami")
version_resp.should.have.key("LaunchTemplateVersion")
version = version_resp["LaunchTemplateVersion"]
version["DefaultVersion"].should.equal(False)
version["LaunchTemplateId"].should.equal(create_resp["LaunchTemplate"]["LaunchTemplateId"])
version["VersionDescription"].should.equal("new ami")
version["VersionNumber"].should.equal(2)
@mock_ec2
def test_describe_launch_template_versions_with_multiple_versions():
cli = boto3.client("ec2", region_name="us-east-1")
cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-def456"
},
VersionDescription="new ami")
resp = cli.describe_launch_template_versions(
LaunchTemplateName="test-template")
resp["LaunchTemplateVersions"].should.have.length_of(2)
resp["LaunchTemplateVersions"][0]["LaunchTemplateData"]["ImageId"].should.equal("ami-abc123")
resp["LaunchTemplateVersions"][1]["LaunchTemplateData"]["ImageId"].should.equal("ami-def456")
@mock_ec2
def test_describe_launch_template_versions_with_versions_option():
cli = boto3.client("ec2", region_name="us-east-1")
cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-def456"
},
VersionDescription="new ami")
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-hij789"
},
VersionDescription="new ami, again")
resp = cli.describe_launch_template_versions(
LaunchTemplateName="test-template",
Versions=["2", "3"])
resp["LaunchTemplateVersions"].should.have.length_of(2)
resp["LaunchTemplateVersions"][0]["LaunchTemplateData"]["ImageId"].should.equal("ami-def456")
resp["LaunchTemplateVersions"][1]["LaunchTemplateData"]["ImageId"].should.equal("ami-hij789")
@mock_ec2
def test_describe_launch_template_versions_with_min():
cli = boto3.client("ec2", region_name="us-east-1")
cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-def456"
},
VersionDescription="new ami")
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-hij789"
},
VersionDescription="new ami, again")
resp = cli.describe_launch_template_versions(
LaunchTemplateName="test-template",
MinVersion="2")
resp["LaunchTemplateVersions"].should.have.length_of(2)
resp["LaunchTemplateVersions"][0]["LaunchTemplateData"]["ImageId"].should.equal("ami-def456")
resp["LaunchTemplateVersions"][1]["LaunchTemplateData"]["ImageId"].should.equal("ami-hij789")
@mock_ec2
def test_describe_launch_template_versions_with_max():
cli = boto3.client("ec2", region_name="us-east-1")
cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-def456"
},
VersionDescription="new ami")
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-hij789"
},
VersionDescription="new ami, again")
resp = cli.describe_launch_template_versions(
LaunchTemplateName="test-template",
MaxVersion="2")
resp["LaunchTemplateVersions"].should.have.length_of(2)
resp["LaunchTemplateVersions"][0]["LaunchTemplateData"]["ImageId"].should.equal("ami-abc123")
resp["LaunchTemplateVersions"][1]["LaunchTemplateData"]["ImageId"].should.equal("ami-def456")
@mock_ec2
def test_describe_launch_template_versions_with_min_and_max():
cli = boto3.client("ec2", region_name="us-east-1")
cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-def456"
},
VersionDescription="new ami")
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-hij789"
},
VersionDescription="new ami, again")
cli.create_launch_template_version(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-345abc"
},
VersionDescription="new ami, because why not")
resp = cli.describe_launch_template_versions(
LaunchTemplateName="test-template",
MinVersion="2",
MaxVersion="3")
resp["LaunchTemplateVersions"].should.have.length_of(2)
resp["LaunchTemplateVersions"][0]["LaunchTemplateData"]["ImageId"].should.equal("ami-def456")
resp["LaunchTemplateVersions"][1]["LaunchTemplateData"]["ImageId"].should.equal("ami-hij789")
@mock_ec2
def test_describe_launch_templates():
cli = boto3.client("ec2", region_name="us-east-1")
lt_ids = []
r = cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
lt_ids.append(r["LaunchTemplate"]["LaunchTemplateId"])
r = cli.create_launch_template(
LaunchTemplateName="test-template2",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
lt_ids.append(r["LaunchTemplate"]["LaunchTemplateId"])
# general call, all templates
resp = cli.describe_launch_templates()
resp.should.have.key("LaunchTemplates")
resp["LaunchTemplates"].should.have.length_of(2)
resp["LaunchTemplates"][0]["LaunchTemplateName"].should.equal("test-template")
resp["LaunchTemplates"][1]["LaunchTemplateName"].should.equal("test-template2")
# filter by names
resp = cli.describe_launch_templates(
LaunchTemplateNames=["test-template2", "test-template"])
resp.should.have.key("LaunchTemplates")
resp["LaunchTemplates"].should.have.length_of(2)
resp["LaunchTemplates"][0]["LaunchTemplateName"].should.equal("test-template2")
resp["LaunchTemplates"][1]["LaunchTemplateName"].should.equal("test-template")
# filter by ids
resp = cli.describe_launch_templates(LaunchTemplateIds=lt_ids)
resp.should.have.key("LaunchTemplates")
resp["LaunchTemplates"].should.have.length_of(2)
resp["LaunchTemplates"][0]["LaunchTemplateName"].should.equal("test-template")
resp["LaunchTemplates"][1]["LaunchTemplateName"].should.equal("test-template2")
@mock_ec2
def test_describe_launch_templates_with_filters():
cli = boto3.client("ec2", region_name="us-east-1")
r = cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
cli.create_tags(
Resources=[r["LaunchTemplate"]["LaunchTemplateId"]],
Tags=[
{"Key": "tag1", "Value": "a value"},
{"Key": "another-key", "Value": "this value"},
])
cli.create_launch_template(
LaunchTemplateName="no-tags",
LaunchTemplateData={
"ImageId": "ami-abc123"
})
resp = cli.describe_launch_templates(Filters=[{
"Name": "tag:tag1", "Values": ["a value"]
}])
resp["LaunchTemplates"].should.have.length_of(1)
resp["LaunchTemplates"][0]["LaunchTemplateName"].should.equal("test-template")
resp = cli.describe_launch_templates(Filters=[{
"Name": "launch-template-name", "Values": ["no-tags"]
}])
resp["LaunchTemplates"].should.have.length_of(1)
resp["LaunchTemplates"][0]["LaunchTemplateName"].should.equal("no-tags")
@mock_ec2
def test_create_launch_template_with_tag_spec():
cli = boto3.client("ec2", region_name="us-east-1")
cli.create_launch_template(
LaunchTemplateName="test-template",
LaunchTemplateData={"ImageId": "ami-abc123"},
TagSpecifications=[{
"ResourceType": "instance",
"Tags": [
{"Key": "key", "Value": "value"}
]
}],
)
resp = cli.describe_launch_template_versions(
LaunchTemplateName="test-template",
Versions=["1"])
version = resp["LaunchTemplateVersions"][0]
version["LaunchTemplateData"].should.have.key("TagSpecifications")
version["LaunchTemplateData"]["TagSpecifications"].should.have.length_of(1)
version["LaunchTemplateData"]["TagSpecifications"][0].should.equal({
"ResourceType": "instance",
"Tags": [
{"Key": "key", "Value": "value"}
]
})

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,114 @@
from __future__ import unicode_literals
import boto3
import json
from botocore.exceptions import ClientError
from six.moves.email_mime_multipart import MIMEMultipart
from six.moves.email_mime_text import MIMEText
import sure # noqa
from nose import tools
from moto import mock_ses, mock_sns, mock_sqs
from moto.ses.models import SESFeedback
@mock_ses
def test_enable_disable_ses_sns_communication():
conn = boto3.client('ses', region_name='us-east-1')
conn.set_identity_notification_topic(
Identity='test.com',
NotificationType='Bounce',
SnsTopic='the-arn'
)
conn.set_identity_notification_topic(
Identity='test.com',
NotificationType='Bounce'
)
def __setup_feedback_env__(ses_conn, sns_conn, sqs_conn, domain, topic, queue, region, expected_msg):
"""Setup the AWS environment to test the SES SNS Feedback"""
# Environment setup
# Create SQS queue
sqs_conn.create_queue(QueueName=queue)
# Create SNS topic
create_topic_response = sns_conn.create_topic(Name=topic)
topic_arn = create_topic_response["TopicArn"]
# Subscribe the SNS topic to the SQS queue
sns_conn.subscribe(TopicArn=topic_arn,
Protocol="sqs",
Endpoint="arn:aws:sqs:%s:123456789012:%s" % (region, queue))
# Verify SES domain
ses_conn.verify_domain_identity(Domain=domain)
# Setup SES notification topic
if expected_msg is not None:
ses_conn.set_identity_notification_topic(
Identity=domain,
NotificationType=expected_msg,
SnsTopic=topic_arn
)
def __test_sns_feedback__(addr, expected_msg):
region_name = "us-east-1"
ses_conn = boto3.client('ses', region_name=region_name)
sns_conn = boto3.client('sns', region_name=region_name)
sqs_conn = boto3.resource('sqs', region_name=region_name)
domain = "example.com"
topic = "bounce-arn-feedback"
queue = "feedback-test-queue"
__setup_feedback_env__(ses_conn, sns_conn, sqs_conn, domain, topic, queue, region_name, expected_msg)
# Send the message
kwargs = dict(
Source="test@" + domain,
Destination={
"ToAddresses": [addr + "@" + domain],
"CcAddresses": ["test_cc@" + domain],
"BccAddresses": ["test_bcc@" + domain],
},
Message={
"Subject": {"Data": "test subject"},
"Body": {"Text": {"Data": "test body"}}
}
)
ses_conn.send_email(**kwargs)
# Wait for messages in the queues
queue = sqs_conn.get_queue_by_name(QueueName=queue)
messages = queue.receive_messages(MaxNumberOfMessages=1)
if expected_msg is not None:
msg = messages[0].body
msg = json.loads(msg)
assert msg["Message"] == SESFeedback.generate_message(expected_msg)
else:
assert len(messages) == 0
@mock_sqs
@mock_sns
@mock_ses
def test_no_sns_feedback():
__test_sns_feedback__("test", None)
@mock_sqs
@mock_sns
@mock_ses
def test_sns_feedback_bounce():
__test_sns_feedback__(SESFeedback.BOUNCE_ADDR, SESFeedback.BOUNCE)
@mock_sqs
@mock_sns
@mock_ses
def test_sns_feedback_complaint():
__test_sns_feedback__(SESFeedback.COMPLAINT_ADDR, SESFeedback.COMPLAINT)
@mock_sqs
@mock_sns
@mock_ses
def test_sns_feedback_delivery():
__test_sns_feedback__(SESFeedback.SUCCESS_ADDR, SESFeedback.DELIVERY)

120
update_version_from_git.py Normal file
View File

@ -0,0 +1,120 @@
"""
Adapted from https://github.com/pygame/pygameweb/blob/master/pygameweb/builds/update_version_from_git.py
For updating the version from git.
__init__.py contains a __version__ field.
Update that.
If we are on master, we want to update the version as a pre-release.
git describe --tags
With these:
__init__.py
__version__= '0.0.2'
git describe --tags
0.0.1-22-g729a5ae
We want this:
__init__.py
__version__= '0.0.2.dev22.g729a5ae'
Get the branch/tag name with this.
git symbolic-ref -q --short HEAD || git describe --tags --exact-match
"""
import io
import os
import re
import subprocess
def migrate_source_attribute(attr, to_this, target_file, regex):
"""Updates __magic__ attributes in the source file"""
change_this = re.compile(regex, re.S)
new_file = []
found = False
with open(target_file, 'r') as fp:
lines = fp.readlines()
for line in lines:
if line.startswith(attr):
found = True
line = re.sub(change_this, to_this, line)
new_file.append(line)
if found:
with open(target_file, 'w') as fp:
fp.writelines(new_file)
def migrate_version(target_file, new_version):
"""Updates __version__ in the source file"""
regex = r"['\"](.*)['\"]"
migrate_source_attribute('__version__', "'{new_version}'".format(new_version=new_version), target_file, regex)
def is_master_branch():
cmd = ('git rev-parse --abbrev-ref HEAD')
tag_branch = subprocess.check_output(cmd, shell=True)
return tag_branch in [b'master\n']
def git_tag_name():
cmd = ('git describe --tags')
tag_branch = subprocess.check_output(cmd, shell=True)
tag_branch = tag_branch.decode().strip()
return tag_branch
def get_git_version_info():
cmd = 'git describe --tags'
ver_str = subprocess.check_output(cmd, shell=True)
ver, commits_since, githash = ver_str.decode().strip().split('-')
return ver, commits_since, githash
def prerelease_version():
""" return what the prerelease version should be.
https://packaging.python.org/tutorials/distributing-packages/#pre-release-versioning
0.0.2.dev22
"""
ver, commits_since, githash = get_git_version_info()
initpy_ver = get_version()
assert len(initpy_ver.split('.')) in [3, 4], 'moto/__init__.py version should be like 0.0.2.dev'
assert initpy_ver > ver, 'the moto/__init__.py version should be newer than the last tagged release.'
return '{initpy_ver}.{commits_since}'.format(initpy_ver=initpy_ver, commits_since=commits_since)
def read(*parts):
""" Reads in file from *parts.
"""
try:
return io.open(os.path.join(*parts), 'r', encoding='utf-8').read()
except IOError:
return ''
def get_version():
""" Returns version from moto/__init__.py
"""
version_file = read('moto', '__init__.py')
version_match = re.search(r'^__version__ = [\'"]([^\'"]*)[\'"]',
version_file, re.MULTILINE)
if version_match:
return version_match.group(1)
raise RuntimeError('Unable to find version string.')
def release_version_correct():
"""Makes sure the:
- prerelease verion for master is correct.
- release version is correct for tags.
"""
if is_master_branch():
# update for a pre release version.
initpy = os.path.abspath("moto/__init__.py")
new_version = prerelease_version()
print('updating version in __init__.py to {new_version}'.format(new_version=new_version))
assert len(new_version.split('.')) >= 4, 'moto/__init__.py version should be like 0.0.2.dev'
migrate_version(initpy, new_version)
else:
assert False, "No non-master deployments yet"
# check that we are a tag with the same version as in __init__.py
assert get_version() == git_tag_name(), 'git tag/branch name not the same as moto/__init__.py __verion__'
if __name__ == '__main__':
release_version_correct()