testing create, get, list, delete policy versions

This commit is contained in:
Kate Heddleston 2017-05-15 14:56:30 -07:00
parent e307dc38e6
commit 992b475093
4 changed files with 150 additions and 20 deletions

View File

@ -53,15 +53,15 @@ class Policy(BaseModel):
return 'arn:aws:iam::aws:policy{0}{1}'.format(self.path, self.name)
class Version(object):
class PolicyVersion(object):
def __init__(self,
policy_arn,
document,
is_default_version=False):
is_default=False):
self.policy_arn = policy_arn
self.document = document or {}
self.is_default_version = is_default_version
self.is_default = is_default
self.version_id = 'v1'
self.create_datetime = datetime.now(pytz.utc)
@ -506,6 +506,9 @@ class IAMBackend(BaseBackend):
self.managed_policies[policy.name] = policy
return policy
def get_policy(self, policy_name):
return self.managed_policies.get(policy_name)
def list_attached_role_policies(self, role_name, marker=None, max_items=100, path_prefix='/'):
policies = self.get_role(role_name).managed_policies.values()
@ -551,15 +554,6 @@ class IAMBackend(BaseBackend):
return policies, marker
def get_policy(self, policy_name):
policy = self.managed_policies[policy_name]
if not policy:
raise IAMNotFoundException("Policy {0} not found".format(policy_name))
return policy
def get_policies(self):
return self.managed_policies.values()
def create_role(self, role_name, assume_role_policy_document, path):
role_id = random_resource_id()
role = Role(role_id, role_name, assume_role_policy_document, path)
@ -596,19 +590,44 @@ class IAMBackend(BaseBackend):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
version = Version(policy_arn, policy_document, set_as_default)
if not policy:
raise IAMNotFoundException("Policy not found")
version = PolicyVersion(policy_arn, policy_document, set_as_default)
policy.versions.append(version)
if set_as_default:
policy.default_version_id = version.version_id
return version
def get_policy_version(self, policy_arn, version_id):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
if not policy:
raise IAMNotFoundException("Policy not found")
for version in policy.versions:
if version.version_id == version_id:
return version
raise IAMNotFoundException("Policy version not found")
def list_policy_versions(self, policy_arn):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
if not policy:
raise IAMNotFoundException("Policy not found")
return policy.versions
def delete_policy_version(self, policy_arn, version_id):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
if not policy:
raise IAMNotFoundException("Policy not found")
for i, v in enumerate(policy.versions):
if v.version_id == version_id:
del policy.versions[i]
return
raise IAMNotFoundException("Policy not found")
def create_instance_profile(self, name, path, role_ids):
instance_profile_id = random_resource_id()

View File

@ -98,9 +98,15 @@ class IamResponse(BaseResponse):
policy_document = self._get_param('PolicyDocument')
set_as_default = self._get_param('SetAsDefault')
policy_version = iam_backend.create_policy_version(policy_arn, policy_document, set_as_default)
template = self.response_template(CREATE_POLICY_VERSION_TEMPLATE)
return template.render(policy_version=policy_version)
template = self.response_template(LIST_POLICY_VERSIONS_TEMPLATE)
return template.render(policy_versions=[policy_version])
def get_policy_version(self):
policy_arn = self._get_param('PolicyArn')
version_id = self._get_param('VersionId')
policy_version = iam_backend.get_policy_version(policy_arn, version_id)
template = self.response_template(GET_POLICY_VERSION_TEMPLATE)
return template.render(policy_version=policy_version)
def list_policy_versions(self):
policy_arn = self._get_param('PolicyArn')
@ -624,15 +630,43 @@ LIST_ROLE_POLICIES = """<ListRolePoliciesResponse xmlns="https://iam.amazonaws.c
</ResponseMetadata>
</ListRolePoliciesResponse>"""
CREATE_POLICY_VERSION_TEMPLATE = """<CreatePolicyVersionResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<CreatePolicyVersionResult>
<PolicyVersion>
<Document>{{ policy_version.document }}</Document>
<VersionId>{{ policy_version.version_id }}</VersionId>
<IsDefaultVersion>{{ policy_version.is_default }}</IsDefaultVersion>
<CreateDate>2012-05-09T15:45:35Z</CreateDate>
</PolicyVersion>
</CreatePolicyVersionResult>
<ResponseMetadata>
<RequestId>20f7279f-99ee-11e1-a4c3-27EXAMPLE804</RequestId>
</ResponseMetadata>
</CreatePolicyVersionResponse>"""
GET_POLICY_VERSION_TEMPLATE = """<GetPolicyVersionResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<GetPolicyVersionResult>
<PolicyVersion>
<Document>{{ policy_version.document }}</Document>
<VersionId>{{ policy_version.version_id }}</VersionId>
<IsDefaultVersion>{{ policy_version.is_default }}</IsDefaultVersion>
<CreateDate>2012-05-09T15:45:35Z</CreateDate>
</PolicyVersion>
</GetPolicyVersionResult>
<ResponseMetadata>
<RequestId>20f7279f-99ee-11e1-a4c3-27EXAMPLE804</RequestId>
</ResponseMetadata>
</GetPolicyVersionResponse>"""
LIST_POLICY_VERSIONS_TEMPLATE = """<ListPolicyVersionsResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<ListPolicyVersionsResult>
<IsTruncated>false</IsTruncated>
<Versions>
{% for version in policy_versions %}
{% for policy_version in policy_versions %}
<member>
<Document>{{ version.document }}</Document>
<VersionId>{{ version.version_id }}</VersionId>
<IsDefaultVersion>{{ version.is_default_version }}</IsDefaultVersion>
<Document>{{ policy_version.document }}</Document>
<VersionId>{{ policy_version.version_id }}</VersionId>
<IsDefaultVersion>{{ policy_version.is_default }}</IsDefaultVersion>
<CreateDate>2012-05-09T15:45:35Z</CreateDate>
</member>
{% endfor %}

View File

@ -196,6 +196,83 @@ def test_update_assume_role_policy():
role.assume_role_policy_document.should.equal("my-policy")
@mock_iam
def test_create_policy_versions():
conn = boto3.client('iam', region_name='us-east-1')
with assert_raises(ClientError):
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
conn.create_policy(
PolicyName="TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestCreatePolicyVersion",
PolicyDocument='{"some":"policy"}')
version.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
@mock_iam
def test_get_policy_version():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_policy(
PolicyName="TestGetPolicyVersion",
PolicyDocument='{"some":"policy"}')
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestGetPolicyVersion",
PolicyDocument='{"some":"policy"}')
with assert_raises(ClientError):
conn.get_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestGetPolicyVersion",
VersionId='v2-does-not-exist')
retrieved = conn.get_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestGetPolicyVersion",
VersionId=version.get('PolicyVersion').get('VersionId'))
retrieved.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
@mock_iam
def test_list_policy_versions():
conn = boto3.client('iam', region_name='us-east-1')
with assert_raises(ClientError):
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions")
conn.create_policy(
PolicyName="TestListPolicyVersions",
PolicyDocument='{"some":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions",
PolicyDocument='{"first":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions",
PolicyDocument='{"second":"policy"}')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::aws:policy/TestListPolicyVersions")
versions.get('Versions')[0].get('Document').should.equal({'first': 'policy'})
versions.get('Versions')[1].get('Document').should.equal({'second': 'policy'})
@mock_iam
def test_delete_policy_version():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_policy(
PolicyName="TestDeletePolicyVersion",
PolicyDocument='{"some":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion",
PolicyDocument='{"first":"policy"}')
with assert_raises(ClientError):
conn.delete_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion",
VersionId='v2-nope-this-does-not-exist')
conn.delete_policy_version(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion",
VersionId='v1')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::aws:policy/TestDeletePolicyVersion")
len(versions.get('Versions')).should.equal(0)
@mock_iam_deprecated()
def test_create_user():
conn = boto.connect_iam()

View File

@ -392,7 +392,7 @@ def test_delete_message():
@mock_sqs_deprecated
def test_send_batch_operation():
conn = boto.connect_sqs('the_key', 'the_secret')
queue = conn.create_queue("test-queue", visibility_timeout=60)
queue = conn.create_queue("test-queue", visibility_timeout=3)
# See https://github.com/boto/boto/issues/831
queue.set_message_class(RawMessage)