Merge pull request #1759 from kgutwin/iam_get_policy

Implement IAM get_policy and correct policy version behavior
This commit is contained in:
Steve Pulec 2018-09-22 16:59:07 -04:00 committed by GitHub
commit c035348ae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 27 deletions

View File

@ -37,7 +37,6 @@ class Policy(BaseModel):
description=None,
document=None,
path=None):
self.document = document or {}
self.name = name
self.attachment_count = 0
@ -45,7 +44,7 @@ class Policy(BaseModel):
self.id = random_policy_id()
self.path = path or '/'
self.default_version_id = default_version_id or 'v1'
self.versions = []
self.versions = [PolicyVersion(self.arn, document, True)]
self.create_datetime = datetime.now(pytz.utc)
self.update_datetime = datetime.now(pytz.utc)
@ -72,11 +71,11 @@ class ManagedPolicy(Policy):
def attach_to(self, obj):
self.attachment_count += 1
obj.managed_policies[self.name] = self
obj.managed_policies[self.arn] = self
def detach_from(self, obj):
self.attachment_count -= 1
del obj.managed_policies[self.name]
del obj.managed_policies[self.arn]
@property
def arn(self):
@ -477,11 +476,13 @@ class IAMBackend(BaseBackend):
document=policy_document,
path=path,
)
self.managed_policies[policy.name] = policy
self.managed_policies[policy.arn] = policy
return policy
def get_policy(self, policy_name):
return self.managed_policies.get(policy_name)
def get_policy(self, policy_arn):
if policy_arn not in self.managed_policies:
raise IAMNotFoundException("Policy {0} not found".format(policy_arn))
return self.managed_policies.get(policy_arn)
def list_attached_role_policies(self, role_name, marker=None, max_items=100, path_prefix='/'):
policies = self.get_role(role_name).managed_policies.values()
@ -575,21 +576,18 @@ class IAMBackend(BaseBackend):
return role.policies.keys()
def create_policy_version(self, policy_arn, policy_document, set_as_default):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
policy = self.get_policy(policy_arn)
if not policy:
raise IAMNotFoundException("Policy not found")
version = PolicyVersion(policy_arn, policy_document, set_as_default)
policy.versions.append(version)
version.version_id = 'v{0}'.format(len(policy.versions))
if set_as_default:
policy.default_version_id = version.version_id
return version
def get_policy_version(self, policy_arn, version_id):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
policy = self.get_policy(policy_arn)
if not policy:
raise IAMNotFoundException("Policy not found")
for version in policy.versions:
@ -598,19 +596,18 @@ class IAMBackend(BaseBackend):
raise IAMNotFoundException("Policy version not found")
def list_policy_versions(self, policy_arn):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
policy = self.get_policy(policy_arn)
if not policy:
raise IAMNotFoundException("Policy not found")
return policy.versions
def delete_policy_version(self, policy_arn, version_id):
policy_name = policy_arn.split(':')[-1]
policy_name = policy_name.split('/')[1]
policy = self.get_policy(policy_name)
policy = self.get_policy(policy_arn)
if not policy:
raise IAMNotFoundException("Policy not found")
if version_id == policy.default_version_id:
raise IAMConflictException(
"Cannot delete the default version of a policy")
for i, v in enumerate(policy.versions):
if v.version_id == version_id:
del policy.versions[i]

View File

@ -58,6 +58,12 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_POLICY_TEMPLATE)
return template.render(policy=policy)
def get_policy(self):
policy_arn = self._get_param('PolicyArn')
policy = iam_backend.get_policy(policy_arn)
template = self.response_template(GET_POLICY_TEMPLATE)
return template.render(policy=policy)
def list_attached_role_policies(self):
marker = self._get_param('Marker')
max_items = self._get_int_param('MaxItems', 100)
@ -601,6 +607,25 @@ CREATE_POLICY_TEMPLATE = """<CreatePolicyResponse>
</ResponseMetadata>
</CreatePolicyResponse>"""
GET_POLICY_TEMPLATE = """<GetPolicyResponse>
<GetPolicyResult>
<Policy>
<PolicyName>{{ policy.name }}</PolicyName>
<Description>{{ policy.description }}</Description>
<DefaultVersionId>{{ policy.default_version_id }}</DefaultVersionId>
<PolicyId>{{ policy.id }}</PolicyId>
<Path>{{ policy.path }}</Path>
<Arn>{{ policy.arn }}</Arn>
<AttachmentCount>{{ policy.attachment_count }}</AttachmentCount>
<CreateDate>{{ policy.create_datetime.isoformat() }}</CreateDate>
<UpdateDate>{{ policy.update_datetime.isoformat() }}</UpdateDate>
</Policy>
</GetPolicyResult>
<ResponseMetadata>
<RequestId>684f0917-3d22-11e4-a4a0-cffb9EXAMPLE</RequestId>
</ResponseMetadata>
</GetPolicyResponse>"""
LIST_ATTACHED_ROLE_POLICIES_TEMPLATE = """<ListAttachedRolePoliciesResponse>
<ListAttachedRolePoliciesResult>
{% if marker is none %}

View File

@ -286,6 +286,16 @@ def test_create_policy_versions():
PolicyDocument='{"some":"policy"}')
version.get('PolicyVersion').get('Document').should.equal({'some': 'policy'})
@mock_iam
def test_get_policy():
conn = boto3.client('iam', region_name='us-east-1')
response = conn.create_policy(
PolicyName="TestGetPolicy",
PolicyDocument='{"some":"policy"}')
policy = conn.get_policy(
PolicyArn="arn:aws:iam::123456789012:policy/TestGetPolicy")
response['Policy']['Arn'].should.equal("arn:aws:iam::123456789012:policy/TestGetPolicy")
@mock_iam
def test_get_policy_version():
@ -314,17 +324,22 @@ def test_list_policy_versions():
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
conn.create_policy(
PolicyName="TestListPolicyVersions",
PolicyDocument='{"some":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"first":"policy"}')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
versions.get('Versions')[0].get('VersionId').should.equal('v1')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"second":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions",
PolicyDocument='{"third":"policy"}')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestListPolicyVersions")
versions.get('Versions')[0].get('Document').should.equal({'first': 'policy'})
print(versions.get('Versions'))
versions.get('Versions')[1].get('Document').should.equal({'second': 'policy'})
versions.get('Versions')[2].get('Document').should.equal({'third': 'policy'})
@mock_iam
@ -332,20 +347,20 @@ def test_delete_policy_version():
conn = boto3.client('iam', region_name='us-east-1')
conn.create_policy(
PolicyName="TestDeletePolicyVersion",
PolicyDocument='{"some":"policy"}')
PolicyDocument='{"first":"policy"}')
conn.create_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
PolicyDocument='{"first":"policy"}')
PolicyDocument='{"second":"policy"}')
with assert_raises(ClientError):
conn.delete_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
VersionId='v2-nope-this-does-not-exist')
conn.delete_policy_version(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion",
VersionId='v1')
VersionId='v2')
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::123456789012:policy/TestDeletePolicyVersion")
len(versions.get('Versions')).should.equal(0)
len(versions.get('Versions')).should.equal(1)
@mock_iam_deprecated()