Merge pull request #2 from acsbendi/policy-validation

Policy validation
This commit is contained in:
Bendegúz Ács 2019-07-01 19:03:02 +02:00 committed by GitHub
commit 0b51dd47f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 2414 additions and 31 deletions

View File

@ -42,6 +42,14 @@ class MalformedCertificate(RESTError):
'MalformedCertificate', 'Certificate {cert} is malformed'.format(cert=cert))
class MalformedPolicyDocument(RESTError):
code = 400
def __init__(self, message=""):
super(MalformedPolicyDocument, self).__init__(
'MalformedPolicyDocument', message)
class DuplicateTags(RESTError):
code = 400

View File

@ -11,6 +11,7 @@ from cryptography.hazmat.backends import default_backend
from moto.core.exceptions import RESTError
from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_without_milliseconds, iso_8601_datetime_with_milliseconds
from moto.iam.policy_validation import IAMPolicyDocumentValidator
from .aws_managed_policies import aws_managed_policies_data
from .exceptions import IAMNotFoundException, IAMConflictException, IAMReportNotPresentException, IAMLimitExceededException, \
@ -575,6 +576,9 @@ class IAMBackend(BaseBackend):
policy.detach_from(self.get_user(user_name))
def create_policy(self, description, path, policy_document, policy_name):
iam_policy_document_validator = IAMPolicyDocumentValidator(policy_document)
iam_policy_document_validator.validate()
policy = ManagedPolicy(
policy_name,
description=description,
@ -667,6 +671,9 @@ class IAMBackend(BaseBackend):
def put_role_policy(self, role_name, policy_name, policy_json):
role = self.get_role(role_name)
iam_policy_document_validator = IAMPolicyDocumentValidator(policy_json)
iam_policy_document_validator.validate()
role.put_policy(policy_name, policy_json)
def delete_role_policy(self, role_name, policy_name):
@ -764,9 +771,13 @@ class IAMBackend(BaseBackend):
role.tags.pop(ref_key, None)
def create_policy_version(self, policy_arn, policy_document, set_as_default):
iam_policy_document_validator = IAMPolicyDocumentValidator(policy_document)
iam_policy_document_validator.validate()
policy = self.get_policy(policy_arn)
if not policy:
raise IAMNotFoundException("Policy not found")
if len(policy.versions) >= 5:
raise IAMLimitExceededException("A managed policy can have up to 5 versions. Before you create a new version, you must delete an existing version.")
set_as_default = (set_as_default == "true") # convert it to python bool
@ -911,6 +922,9 @@ class IAMBackend(BaseBackend):
def put_group_policy(self, group_name, policy_name, policy_json):
group = self.get_group(group_name)
iam_policy_document_validator = IAMPolicyDocumentValidator(policy_json)
iam_policy_document_validator.validate()
group.put_policy(policy_name, policy_json)
def list_group_policies(self, group_name, marker=None, max_items=None):
@ -1071,6 +1085,9 @@ class IAMBackend(BaseBackend):
def put_user_policy(self, user_name, policy_name, policy_json):
user = self.get_user(user_name)
iam_policy_document_validator = IAMPolicyDocumentValidator(policy_json)
iam_policy_document_validator.validate()
user.put_policy(policy_name, policy_json)
def delete_user_policy(self, user_name, policy_name):

View File

@ -0,0 +1,450 @@
import json
import re
from six import string_types
from moto.iam.exceptions import MalformedPolicyDocument
VALID_TOP_ELEMENTS = [
"Version",
"Id",
"Statement",
"Conditions"
]
VALID_VERSIONS = [
"2008-10-17",
"2012-10-17"
]
VALID_STATEMENT_ELEMENTS = [
"Sid",
"Action",
"NotAction",
"Resource",
"NotResource",
"Effect",
"Condition"
]
VALID_EFFECTS = [
"Allow",
"Deny"
]
VALID_CONDITIONS = [
"StringEquals",
"StringNotEquals",
"StringEqualsIgnoreCase",
"StringNotEqualsIgnoreCase",
"StringLike",
"StringNotLike",
"NumericEquals",
"NumericNotEquals",
"NumericLessThan",
"NumericLessThanEquals",
"NumericGreaterThan",
"NumericGreaterThanEquals",
"DateEquals",
"DateNotEquals",
"DateLessThan",
"DateLessThanEquals",
"DateGreaterThan",
"DateGreaterThanEquals",
"Bool",
"BinaryEquals",
"IpAddress",
"NotIpAddress",
"ArnEquals",
"ArnLike",
"ArnNotEquals",
"ArnNotLike",
"Null"
]
VALID_CONDITION_PREFIXES = [
"ForAnyValue:",
"ForAllValues:"
]
VALID_CONDITION_POSTFIXES = [
"IfExists"
]
SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS = {
"iam": 'IAM resource {resource} cannot contain region information.',
"s3": 'Resource {resource} can not contain region information.'
}
VALID_RESOURCE_PATH_STARTING_VALUES = {
"iam": {
"values": ["user/", "federated-user/", "role/", "group/", "instance-profile/", "mfa/", "server-certificate/",
"policy/", "sms-mfa/", "saml-provider/", "oidc-provider/", "report/", "access-report/"],
"error_message": 'IAM resource path must either be "*" or start with {values}.'
}
}
class IAMPolicyDocumentValidator:
def __init__(self, policy_document):
self._policy_document: str = policy_document
self._policy_json: dict = {}
self._statements = []
self._resource_error = "" # the first resource error found that does not generate a legacy parsing error
def validate(self):
try:
self._validate_syntax()
except Exception:
raise MalformedPolicyDocument("Syntax errors in policy.")
try:
self._validate_version()
except Exception:
raise MalformedPolicyDocument("Policy document must be version 2012-10-17 or greater.")
try:
self._perform_first_legacy_parsing()
self._validate_resources_for_formats()
self._validate_not_resources_for_formats()
except Exception:
raise MalformedPolicyDocument("The policy failed legacy parsing")
try:
self._validate_sid_uniqueness()
except Exception:
raise MalformedPolicyDocument("Statement IDs (SID) in a single policy must be unique.")
try:
self._validate_action_like_exist()
except Exception:
raise MalformedPolicyDocument("Policy statement must contain actions.")
try:
self._validate_resource_exist()
except Exception:
raise MalformedPolicyDocument("Policy statement must contain resources.")
if self._resource_error != "":
raise MalformedPolicyDocument(self._resource_error)
self._validate_actions_for_prefixes()
self._validate_not_actions_for_prefixes()
def _validate_syntax(self):
self._policy_json = json.loads(self._policy_document)
assert isinstance(self._policy_json, dict)
self._validate_top_elements()
self._validate_version_syntax()
self._validate_id_syntax()
self._validate_statements_syntax()
def _validate_top_elements(self):
top_elements = self._policy_json.keys()
for element in top_elements:
assert element in VALID_TOP_ELEMENTS
def _validate_version_syntax(self):
if "Version" in self._policy_json:
assert self._policy_json["Version"] in VALID_VERSIONS
def _validate_version(self):
assert self._policy_json["Version"] == "2012-10-17"
def _validate_sid_uniqueness(self):
sids = []
for statement in self._statements:
if "Sid" in statement:
assert statement["Sid"] not in sids
sids.append(statement["Sid"])
def _validate_statements_syntax(self):
assert "Statement" in self._policy_json
assert isinstance(self._policy_json["Statement"], (dict, list))
if isinstance(self._policy_json["Statement"], dict):
self._statements.append(self._policy_json["Statement"])
else:
self._statements += self._policy_json["Statement"]
assert self._statements
for statement in self._statements:
self._validate_statement_syntax(statement)
@staticmethod
def _validate_statement_syntax(statement):
assert isinstance(statement, dict)
for statement_element in statement.keys():
assert statement_element in VALID_STATEMENT_ELEMENTS
assert ("Resource" not in statement or "NotResource" not in statement)
assert ("Action" not in statement or "NotAction" not in statement)
IAMPolicyDocumentValidator._validate_effect_syntax(statement)
IAMPolicyDocumentValidator._validate_action_syntax(statement)
IAMPolicyDocumentValidator._validate_not_action_syntax(statement)
IAMPolicyDocumentValidator._validate_resource_syntax(statement)
IAMPolicyDocumentValidator._validate_not_resource_syntax(statement)
IAMPolicyDocumentValidator._validate_condition_syntax(statement)
IAMPolicyDocumentValidator._validate_sid_syntax(statement)
@staticmethod
def _validate_effect_syntax(statement):
assert "Effect" in statement
assert isinstance(statement["Effect"], string_types)
assert statement["Effect"].lower() in [allowed_effect.lower() for allowed_effect in VALID_EFFECTS]
@staticmethod
def _validate_action_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "Action")
@staticmethod
def _validate_not_action_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "NotAction")
@staticmethod
def _validate_resource_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "Resource")
@staticmethod
def _validate_not_resource_syntax(statement):
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(statement, "NotResource")
@staticmethod
def _validate_string_or_list_of_strings_syntax(statement, key):
if key in statement:
assert isinstance(statement[key], (string_types, list))
if isinstance(statement[key], list):
for resource in statement[key]:
assert isinstance(resource, string_types)
@staticmethod
def _validate_condition_syntax(statement):
if "Condition" in statement:
assert isinstance(statement["Condition"], dict)
for condition_key, condition_value in statement["Condition"].items():
assert isinstance(condition_value, dict)
for condition_element_key, condition_element_value in condition_value.items():
assert isinstance(condition_element_value, (list, string_types))
if IAMPolicyDocumentValidator._strip_condition_key(condition_key) not in VALID_CONDITIONS:
assert not condition_value # empty dict
@staticmethod
def _strip_condition_key(condition_key):
for valid_prefix in VALID_CONDITION_PREFIXES:
if condition_key.startswith(valid_prefix):
condition_key = condition_key[len(valid_prefix):]
break # strip only the first match
for valid_postfix in VALID_CONDITION_POSTFIXES:
if condition_key.endswith(valid_postfix):
condition_key = condition_key[:-len(valid_postfix)]
break # strip only the first match
return condition_key
@staticmethod
def _validate_sid_syntax(statement):
if "Sid" in statement:
assert isinstance(statement["Sid"], string_types)
def _validate_id_syntax(self):
if "Id" in self._policy_json:
assert isinstance(self._policy_json["Id"], string_types)
def _validate_resource_exist(self):
for statement in self._statements:
assert ("Resource" in statement or "NotResource" in statement)
if "Resource" in statement and isinstance(statement["Resource"], list):
assert statement["Resource"]
elif "NotResource" in statement and isinstance(statement["NotResource"], list):
assert statement["NotResource"]
def _validate_action_like_exist(self):
for statement in self._statements:
assert ("Action" in statement or "NotAction" in statement)
if "Action" in statement and isinstance(statement["Action"], list):
assert statement["Action"]
elif "NotAction" in statement and isinstance(statement["NotAction"], list):
assert statement["NotAction"]
def _validate_actions_for_prefixes(self):
self._validate_action_like_for_prefixes("Action")
def _validate_not_actions_for_prefixes(self):
self._validate_action_like_for_prefixes("NotAction")
def _validate_action_like_for_prefixes(self, key):
for statement in self._statements:
if key in statement:
if isinstance(statement[key], string_types):
self._validate_action_prefix(statement[key])
else:
for action in statement[key]:
self._validate_action_prefix(action)
@staticmethod
def _validate_action_prefix(action):
action_parts = action.split(":")
if len(action_parts) == 1 and action_parts[0] != "*":
raise MalformedPolicyDocument("Actions/Conditions must be prefaced by a vendor, e.g., iam, sdb, ec2, etc.")
elif len(action_parts) > 2:
raise MalformedPolicyDocument("Actions/Condition can contain only one colon.")
vendor_pattern = re.compile(r'[^a-zA-Z0-9\-.]')
if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]):
raise MalformedPolicyDocument("Vendor {vendor} is not valid".format(vendor=action_parts[0]))
def _validate_resources_for_formats(self):
self._validate_resource_like_for_formats("Resource")
def _validate_not_resources_for_formats(self):
self._validate_resource_like_for_formats("NotResource")
def _validate_resource_like_for_formats(self, key):
for statement in self._statements:
if key in statement:
if isinstance(statement[key], string_types):
self._validate_resource_format(statement[key])
else:
for resource in sorted(statement[key], reverse=True):
self._validate_resource_format(resource)
if self._resource_error == "":
IAMPolicyDocumentValidator._legacy_parse_resource_like(statement, key)
def _validate_resource_format(self, resource):
if resource != "*":
resource_partitions = resource.partition(":")
if resource_partitions[1] == "":
self._resource_error = 'Resource {resource} must be in ARN format or "*".'.format(resource=resource)
return
resource_partitions = resource_partitions[2].partition(":")
if resource_partitions[0] != "aws":
remaining_resource_parts = resource_partitions[2].split(":")
arn1 = remaining_resource_parts[0] if remaining_resource_parts[0] != "" or len(remaining_resource_parts) > 1 else "*"
arn2 = remaining_resource_parts[1] if len(remaining_resource_parts) > 1 else "*"
arn3 = remaining_resource_parts[2] if len(remaining_resource_parts) > 2 else "*"
arn4 = ":".join(remaining_resource_parts[3:]) if len(remaining_resource_parts) > 3 else "*"
self._resource_error = 'Partition "{partition}" is not valid for resource "arn:{partition}:{arn1}:{arn2}:{arn3}:{arn4}".'.format(
partition=resource_partitions[0],
arn1=arn1,
arn2=arn2,
arn3=arn3,
arn4=arn4
)
return
if resource_partitions[1] != ":":
self._resource_error = "Resource vendor must be fully qualified and cannot contain regexes."
return
resource_partitions = resource_partitions[2].partition(":")
service = resource_partitions[0]
if service in SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS.keys() and not resource_partitions[2].startswith(":"):
self._resource_error = SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS[service].format(resource=resource)
return
resource_partitions = resource_partitions[2].partition(":")
resource_partitions = resource_partitions[2].partition(":")
if service in VALID_RESOURCE_PATH_STARTING_VALUES.keys():
valid_start = False
for valid_starting_value in VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"]:
if resource_partitions[2].startswith(valid_starting_value):
valid_start = True
break
if not valid_start:
self._resource_error = VALID_RESOURCE_PATH_STARTING_VALUES[service]["error_message"].format(
values=", ".join(VALID_RESOURCE_PATH_STARTING_VALUES[service]["values"])
)
def _perform_first_legacy_parsing(self):
"""This method excludes legacy parsing resources, since that have to be done later."""
for statement in self._statements:
self._legacy_parse_statement(statement)
@staticmethod
def _legacy_parse_statement(statement):
assert statement["Effect"] in VALID_EFFECTS # case-sensitive matching
if "Condition" in statement:
for condition_key, condition_value in statement["Condition"].items():
IAMPolicyDocumentValidator._legacy_parse_condition(condition_key, condition_value)
@staticmethod
def _legacy_parse_resource_like(statement, key):
if isinstance(statement[key], string_types):
if statement[key] != "*":
assert statement[key].count(":") >= 5 or "::" not in statement[key]
assert statement[key].split(":")[2] != ""
else: # list
for resource in statement[key]:
if resource != "*":
assert resource.count(":") >= 5 or "::" not in resource
assert resource[2] != ""
@staticmethod
def _legacy_parse_condition(condition_key, condition_value):
stripped_condition_key = IAMPolicyDocumentValidator._strip_condition_key(condition_key)
if stripped_condition_key.startswith("Date"):
for condition_element_key, condition_element_value in condition_value.items():
if isinstance(condition_element_value, string_types):
IAMPolicyDocumentValidator._legacy_parse_date_condition_value(condition_element_value)
else: # it has to be a list
for date_condition_value in condition_element_value:
IAMPolicyDocumentValidator._legacy_parse_date_condition_value(date_condition_value)
@staticmethod
def _legacy_parse_date_condition_value(date_condition_value):
if "t" in date_condition_value.lower() or "-" in date_condition_value:
IAMPolicyDocumentValidator._validate_iso_8601_datetime(date_condition_value.lower())
else: # timestamp
assert 0 <= int(date_condition_value) <= 9223372036854775807
@staticmethod
def _validate_iso_8601_datetime(datetime):
datetime_parts = datetime.partition("t")
negative_year = datetime_parts[0].startswith("-")
date_parts = datetime_parts[0][1:].split("-") if negative_year else datetime_parts[0].split("-")
year = "-" + date_parts[0] if negative_year else date_parts[0]
assert -292275054 <= int(year) <= 292278993
if len(date_parts) > 1:
month = date_parts[1]
assert 1 <= int(month) <= 12
if len(date_parts) > 2:
day = date_parts[2]
assert 1 <= int(day) <= 31
assert len(date_parts) < 4
time_parts = datetime_parts[2].split(":")
if time_parts[0] != "":
hours = time_parts[0]
assert 0 <= int(hours) <= 23
if len(time_parts) > 1:
minutes = time_parts[1]
assert 0 <= int(minutes) <= 59
if len(time_parts) > 2:
if "z" in time_parts[2]:
seconds_with_decimal_fraction = time_parts[2].partition("z")[0]
assert time_parts[2].partition("z")[2] == ""
elif "+" in time_parts[2]:
seconds_with_decimal_fraction = time_parts[2].partition("+")[0]
time_zone_data = time_parts[2].partition("+")[2].partition(":")
time_zone_hours = time_zone_data[0]
assert len(time_zone_hours) == 2
assert 0 <= int(time_zone_hours) <= 23
if time_zone_data[1] == ":":
time_zone_minutes = time_zone_data[2]
assert len(time_zone_minutes) == 2
assert 0 <= int(time_zone_minutes) <= 59
else:
seconds_with_decimal_fraction = time_parts[2]
seconds_with_decimal_fraction_partition = seconds_with_decimal_fraction.partition(".")
seconds = seconds_with_decimal_fraction_partition[0]
assert 0 <= int(seconds) <= 59
if seconds_with_decimal_fraction_partition[1] == ".":
decimal_seconds = seconds_with_decimal_fraction_partition[2]
assert 0 <= int(decimal_seconds) <= 999999999

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import base64
import json
import boto
import boto3
@ -29,6 +30,44 @@ FyDHrtlrS80dPUQWNYHw++oACDpWO01LGLPPrGmuO/7cOdojPEd852q5gd+7W9xt
8vUH+pBa6IBLbvBp+szli51V3TLSWcoyy4ceJNQU2vCkTLoFdS0RLd/7tQ==
-----END CERTIFICATE-----"""
MOCK_POLICY = """
{
"Version": "2012-10-17",
"Statement":
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::example_bucket"
}
}
"""
MOCK_POLICY_2 = """
{
"Version": "2012-10-17",
"Id": "2",
"Statement":
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::example_bucket"
}
}
"""
MOCK_POLICY_3 = """
{
"Version": "2012-10-17",
"Id": "3",
"Statement":
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::example_bucket"
}
}
"""
@mock_iam_deprecated()
def test_get_all_server_certs():
@ -243,12 +282,12 @@ def test_list_instance_profiles_for_role():
def test_list_role_policies():
conn = boto.connect_iam()
conn.create_role("my-role")
conn.put_role_policy("my-role", "test policy", "my policy")
conn.put_role_policy("my-role", "test policy", MOCK_POLICY)
role = conn.list_role_policies("my-role")
role.policy_names.should.have.length_of(1)
role.policy_names[0].should.equal("test policy")
conn.put_role_policy("my-role", "test policy 2", "another policy")
conn.put_role_policy("my-role", "test policy 2", MOCK_POLICY)
role = conn.list_role_policies("my-role")
role.policy_names.should.have.length_of(2)
@ -266,7 +305,7 @@ def test_put_role_policy():
conn = boto.connect_iam()
conn.create_role(
"my-role", assume_role_policy_document="some policy", path="my-path")
conn.put_role_policy("my-role", "test policy", "my policy")
conn.put_role_policy("my-role", "test policy", MOCK_POLICY)
policy = conn.get_role_policy(
"my-role", "test policy")['get_role_policy_response']['get_role_policy_result']['policy_name']
policy.should.equal("test policy")
@ -286,7 +325,7 @@ def test_create_policy():
conn = boto3.client('iam', region_name='us-east-1')
response = conn.create_policy(
PolicyName="TestCreatePolicy",
PolicyDocument='{"some":"policy"}')
PolicyDocument=MOCK_POLICY)
response['Policy']['Arn'].should.equal("arn:aws:iam::123456789012:policy/TestCreatePolicy")
@ -299,12 +338,12 @@ def test_create_policy_versions():
PolicyDocument='{"some":"policy"}')
conn.create_policy(
PolicyName="TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
PolicyDocument=MOCK_POLICY)
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}',
PolicyDocument=MOCK_POLICY,
SetAsDefault=True)
version.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
version.get('PolicyVersion').get('Document').should.equal(json.loads(MOCK_POLICY))
version.get('PolicyVersion').get('VersionId').should.equal("v2")
version.get('PolicyVersion').get('IsDefaultVersion').should.be.ok
conn.delete_policy_version(
@ -312,7 +351,7 @@ def test_create_policy_versions():
VersionId="v1")
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
PolicyDocument=MOCK_POLICY)
version.get('PolicyVersion').get('VersionId').should.equal("v3")
version.get('PolicyVersion').get('IsDefaultVersion').shouldnt.be.ok
@ -362,7 +401,7 @@ def test_get_policy():
conn = boto3.client('iam', region_name='us-east-1')
response = conn.create_policy(
PolicyName="TestGetPolicy",
PolicyDocument='{"some":"policy"}')
PolicyDocument=MOCK_POLICY)
policy = conn.get_policy(
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicy")
policy['Policy']['Arn'].should.equal("arn:aws:iam::123456789012:policy/TestGetPolicy")
@ -384,10 +423,10 @@ def test_get_policy_version():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_policy(
PolicyName="TestGetPolicyVersion",
PolicyDocument='{"some":"policy"}')
PolicyDocument=MOCK_POLICY)
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicyVersion",
PolicyDocument='{"some":"policy"}')
PolicyDocument=MOCK_POLICY)
with assert_raises(ClientError):
conn.get_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicyVersion",
@ -395,7 +434,7 @@ def test_get_policy_version():
retrieved = conn.get_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicyVersion",
VersionId=version.get('PolicyVersion').get('VersionId'))
retrieved.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
retrieved.get('PolicyVersion').get('Document').should.equal(json.loads(MOCK_POLICY))
retrieved.get('PolicyVersion').get('IsDefaultVersion').shouldnt.be.ok
@ -439,7 +478,7 @@ def test_list_policy_versions():
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
conn.create_policy(
PolicyName="TestListPolicyVersions",
PolicyDocument='{"first":"policy"}')
PolicyDocument=MOCK_POLICY)
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
versions.get('Versions')[0].get('VersionId').should.equal('v1')
@ -447,15 +486,15 @@ def test_list_policy_versions():
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"second":"policy"}')
PolicyDocument=MOCK_POLICY_2)
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"third":"policy"}')
PolicyDocument=MOCK_POLICY_3)
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
versions.get('Versions')[1].get('Document').should.equal({'second': 'policy'})
versions.get('Versions')[1].get('Document').should.equal(json.loads(MOCK_POLICY_2))
versions.get('Versions')[1].get('IsDefaultVersion').shouldnt.be.ok
versions.get('Versions')[2].get('Document').should.equal({'third': 'policy'})
versions.get('Versions')[2].get('Document').should.equal(json.loads(MOCK_POLICY_3))
versions.get('Versions')[2].get('IsDefaultVersion').shouldnt.be.ok
@ -464,10 +503,10 @@ def test_delete_policy_version():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_policy(
PolicyName="TestDeletePolicyVersion",
PolicyDocument='{"first":"policy"}')
PolicyDocument=MOCK_POLICY)
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
PolicyDocument='{"second":"policy"}')
PolicyDocument=MOCK_POLICY)
with assert_raises(ClientError):
conn.delete_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
@ -549,22 +588,20 @@ def test_list_users():
@mock_iam()
def test_user_policies():
policy_name = 'UserManagedPolicy'
policy_document = "{'mypolicy': 'test'}"
user_name = 'my-user'
conn = boto3.client('iam', region_name='us-east-1')
conn.create_user(UserName=user_name)
conn.put_user_policy(
UserName=user_name,
PolicyName=policy_name,
PolicyDocument=policy_document
PolicyDocument=MOCK_POLICY
)
policy_doc = conn.get_user_policy(
UserName=user_name,
PolicyName=policy_name
)
test = policy_document in policy_doc['PolicyDocument']
test.should.equal(True)
policy_doc['PolicyDocument'].should.equal(json.loads(MOCK_POLICY))
policies = conn.list_user_policies(UserName=user_name)
len(policies['PolicyNames']).should.equal(1)
@ -725,7 +762,7 @@ def test_managed_policy():
conn = boto.connect_iam()
conn.create_policy(policy_name='UserManagedPolicy',
policy_document={'mypolicy': 'test'},
policy_document=MOCK_POLICY,
path='/mypolicy/',
description='my user managed policy')
@ -826,7 +863,7 @@ def test_attach_detach_user_policy():
policy_name = 'UserAttachedPolicy'
policy = iam.create_policy(PolicyName=policy_name,
PolicyDocument='{"mypolicy": "test"}',
PolicyDocument=MOCK_POLICY,
Path='/mypolicy/',
Description='my user attached policy')
@ -882,7 +919,6 @@ def test_get_access_key_last_used():
@mock_iam
def test_get_account_authorization_details():
import json
test_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [
@ -1314,7 +1350,6 @@ def test_update_role():
@mock_iam()
def test_list_entities_for_policy():
import json
test_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [

View File

@ -10,6 +10,18 @@ from nose.tools import assert_raises
from boto.exception import BotoServerError
from moto import mock_iam, mock_iam_deprecated
MOCK_POLICY = """
{
"Version": "2012-10-17",
"Statement":
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::example_bucket"
}
}
"""
@mock_iam_deprecated()
def test_create_group():
@ -101,7 +113,7 @@ def test_get_groups_for_user():
def test_put_group_policy():
conn = boto.connect_iam()
conn.create_group('my-group')
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
conn.put_group_policy('my-group', 'my-policy', MOCK_POLICY)
@mock_iam
@ -131,7 +143,7 @@ def test_get_group_policy():
with assert_raises(BotoServerError):
conn.get_group_policy('my-group', 'my-policy')
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
conn.put_group_policy('my-group', 'my-policy', MOCK_POLICY)
conn.get_group_policy('my-group', 'my-policy')
@ -141,7 +153,7 @@ def test_get_all_group_policies():
conn.create_group('my-group')
policies = conn.get_all_group_policies('my-group')['list_group_policies_response']['list_group_policies_result']['policy_names']
assert policies == []
conn.put_group_policy('my-group', 'my-policy', '{"some": "json"}')
conn.put_group_policy('my-group', 'my-policy', MOCK_POLICY)
policies = conn.get_all_group_policies('my-group')['list_group_policies_response']['list_group_policies_result']['policy_names']
assert policies == ['my-policy']
@ -151,5 +163,5 @@ def test_list_group_policies():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_group(GroupName='my-group')
conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.be.empty
conn.put_group_policy(GroupName='my-group', PolicyName='my-policy', PolicyDocument='{"some": "json"}')
conn.put_group_policy(GroupName='my-group', PolicyName='my-policy', PolicyDocument=MOCK_POLICY)
conn.list_group_policies(GroupName='my-group')['PolicyNames'].should.equal(['my-policy'])

File diff suppressed because it is too large Load Diff