adding AWS IoT policy version handling

[+] `list_policy_version`
[+] `get_policy_version`
[+] `create_policy_version`
[+] `delete_policy_version`
[+] `set_default_policy_version`
This commit is contained in:
Stephan Huber 2018-10-25 12:13:56 +02:00
parent 36d8f118e3
commit bb7e1197bc
3 changed files with 215 additions and 7 deletions

View File

@ -136,18 +136,19 @@ class FakeCertificate(BaseModel):
class FakePolicy(BaseModel):
def __init__(self, name, document, region_name):
def __init__(self, name, document, region_name, default_version_id='1'):
self.name = name
self.document = document
self.arn = 'arn:aws:iot:%s:1:policy/%s' % (region_name, name)
self.version = '1' # TODO: handle version
self.default_version_id = default_version_id
self.versions = [FakePolicyVersion(self.name, document, True, region_name)]
def to_get_dict(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'defaultVersionId': self.version
'defaultVersionId': self.default_version_id
}
def to_dict_at_creation(self):
@ -155,7 +156,7 @@ class FakePolicy(BaseModel):
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.version
'policyVersionId': self.default_version_id
}
def to_dict(self):
@ -165,6 +166,50 @@ class FakePolicy(BaseModel):
}
class FakePolicyVersion(object):
def __init__(self,
policy_name,
document,
is_default,
region_name):
self.name = policy_name
self.arn = 'arn:aws:iot:%s:1:policy/%s' % (region_name, policy_name)
self.document = document or {}
self.is_default = is_default
self.version_id = '1'
self.create_datetime = time.mktime(datetime(2015, 1, 1).timetuple())
self.last_modified_datetime = time.mktime(datetime(2015, 1, 2).timetuple())
def to_get_dict(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.version_id,
'isDefaultVersion': self.is_default,
'creationDate': self.create_datetime,
'lastModifiedDate': self.last_modified_datetime,
'generationId': self.version_id
}
def to_dict_at_creation(self):
return {
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.version_id,
'isDefaultVersion': self.is_default
}
def to_dict(self):
return {
'versionId': self.version_id,
'isDefaultVersion': self.is_default,
'createDate': self.create_datetime,
}
class FakeJob(BaseModel):
JOB_ID_REGEX_PATTERN = "[a-zA-Z0-9_-]"
JOB_ID_REGEX = re.compile(JOB_ID_REGEX_PATTERN)
@ -436,6 +481,57 @@ class IoTBackend(BaseBackend):
policy = self.get_policy(policy_name)
del self.policies[policy.name]
def create_policy_version(self, policy_name, policy_document, set_as_default):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
version = FakePolicyVersion(policy_name, policy_document, set_as_default, self.region_name)
policy.versions.append(version)
version.version_id = '{0}'.format(len(policy.versions))
if set_as_default:
self.set_default_policy_version(policy_name, version.version_id)
return version
def set_default_policy_version(self, policy_name, version_id):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
for version in policy.versions:
if version.version_id == version_id:
version.is_default = True
policy.default_version_id = version.version_id
policy.document = version.document
else:
version.is_default = False
def get_policy_version(self, policy_name, version_id):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
for version in policy.versions:
if version.version_id == version_id:
return version
raise ResourceNotFoundException()
def list_policy_versions(self, policy_name):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
return policy.versions
def delete_policy_version(self, policy_name, version_id):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
if version_id == policy.default_version_id:
raise InvalidRequestException(
"Cannot delete the default version of a policy")
for i, v in enumerate(policy.versions):
if v.version_id == version_id:
del policy.versions[i]
return
raise ResourceNotFoundException()
def _get_principal(self, principal_arn):
"""
raise ResourceNotFoundException

View File

@ -1,7 +1,7 @@
from __future__ import unicode_literals
import json
from urllib.parse import unquote
from six.moves.urllib.parse import parse_qs, urlparse, unquote
from moto.core.responses import BaseResponse
from .models import iot_backends
@ -235,6 +235,40 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def create_policy_version(self):
policy_name = self._get_param('policyName')
policy_document = self._get_param('policyDocument')
set_as_default = self._get_bool_param('setAsDefault')
policy_version = self.iot_backend.create_policy_version(policy_name, policy_document, set_as_default)
return json.dumps(dict(policy_version.to_dict_at_creation()))
def set_default_policy_version(self):
policy_name = self._get_param('policyName')
version_id = self._get_param('policyVersionId')
self.iot_backend.set_default_policy_version(policy_name, version_id)
return json.dumps(dict())
def get_policy_version(self):
policy_name = self._get_param('policyName')
version_id = self._get_param('policyVersionId')
policy_version = self.iot_backend.get_policy_version(policy_name, version_id)
return json.dumps(dict(policy_version.to_get_dict()))
def list_policy_versions(self):
policy_name = self._get_param('policyName')
policiy_versions = self.iot_backend.list_policy_versions(policy_name=policy_name)
return json.dumps(dict(policyVersions=[_.to_dict() for _ in policiy_versions]))
def delete_policy_version(self):
policy_name = self._get_param('policyName')
version_id = self._get_param('policyVersionId')
self.iot_backend.delete_policy_version(policy_name, version_id)
return json.dumps(dict())
def attach_policy(self):
policy_name = self._get_param("policyName")
principal = self._get_param('target')

View File

@ -1,8 +1,7 @@
from __future__ import unicode_literals
import json
import sure # noqa
import sure #noqa
import boto3
from moto import mock_iot
@ -52,6 +51,85 @@ def test_list_attached_policies():
policies['policies'].should.be.empty
@mock_iot
def test_policy_versions():
client = boto3.client('iot', region_name='ap-northeast-1')
policy_name = 'my-policy'
doc = '{}'
policy = client.create_policy(policyName=policy_name, policyDocument=doc)
policy.should.have.key('policyName').which.should.equal(policy_name)
policy.should.have.key('policyArn').which.should_not.be.none
policy.should.have.key('policyDocument').which.should.equal(json.dumps({}))
policy.should.have.key('policyVersionId').which.should.equal('1')
policy = client.get_policy(policyName=policy_name)
policy.should.have.key('policyName').which.should.equal(policy_name)
policy.should.have.key('policyArn').which.should_not.be.none
policy.should.have.key('policyDocument').which.should.equal(json.dumps({}))
policy.should.have.key('defaultVersionId').which.should.equal(policy['defaultVersionId'])
policy1 = client.create_policy_version(policyName=policy_name, policyDocument=json.dumps({'version': 'version_1'}),
setAsDefault=True)
policy1.should.have.key('policyArn').which.should_not.be.none
policy1.should.have.key('policyDocument').which.should.equal(json.dumps({'version': 'version_1'}))
policy1.should.have.key('policyVersionId').which.should.equal('2')
policy1.should.have.key('isDefaultVersion').which.should.equal(True)
policy2 = client.create_policy_version(policyName=policy_name, policyDocument=json.dumps({'version': 'version_2'}),
setAsDefault=False)
policy2.should.have.key('policyArn').which.should_not.be.none
policy2.should.have.key('policyDocument').which.should.equal(json.dumps({'version': 'version_2'}))
policy2.should.have.key('policyVersionId').which.should.equal('3')
policy2.should.have.key('isDefaultVersion').which.should.equal(False)
policy = client.get_policy(policyName=policy_name)
policy.should.have.key('policyName').which.should.equal(policy_name)
policy.should.have.key('policyArn').which.should_not.be.none
policy.should.have.key('policyDocument').which.should.equal(json.dumps({'version': 'version_1'}))
policy.should.have.key('defaultVersionId').which.should.equal(policy1['policyVersionId'])
policy_versions = client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key('policyVersions').which.should.have.length_of(3)
list(map(lambda item: item['isDefaultVersion'], policy_versions['policyVersions'])).count(True).should.equal(1)
default_policy = list(filter(lambda item: item['isDefaultVersion'], policy_versions['policyVersions']))
default_policy[0].should.have.key('versionId').should.equal(policy1['policyVersionId'])
policy = client.get_policy(policyName=policy_name)
policy.should.have.key('policyName').which.should.equal(policy_name)
policy.should.have.key('policyArn').which.should_not.be.none
policy.should.have.key('policyDocument').which.should.equal(json.dumps({'version': 'version_1'}))
policy.should.have.key('defaultVersionId').which.should.equal(policy1['policyVersionId'])
client.set_default_policy_version(policyName=policy_name, policyVersionId=policy2['policyVersionId'])
policy_versions = client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key('policyVersions').which.should.have.length_of(3)
list(map(lambda item: item['isDefaultVersion'], policy_versions['policyVersions'])).count(True).should.equal(1)
default_policy = list(filter(lambda item: item['isDefaultVersion'], policy_versions['policyVersions']))
default_policy[0].should.have.key('versionId').should.equal(policy2['policyVersionId'])
policy = client.get_policy(policyName=policy_name)
policy.should.have.key('policyName').which.should.equal(policy_name)
policy.should.have.key('policyArn').which.should_not.be.none
policy.should.have.key('policyDocument').which.should.equal(json.dumps({'version': 'version_2'}))
policy.should.have.key('defaultVersionId').which.should.equal(policy2['policyVersionId'])
client.delete_policy_version(policyName=policy_name, policyVersionId='1')
policy_versions = client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key('policyVersions').which.should.have.length_of(2)
client.delete_policy_version(policyName=policy_name, policyVersionId=policy1['policyVersionId'])
policy_versions = client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key('policyVersions').which.should.have.length_of(1)
# should fail as it's the default policy. Should use delete_policy instead
try:
client.delete_policy_version(policyName=policy_name, policyVersionId=policy2['policyVersionId'])
assert False, 'Should have failed in previous call'
except Exception as exception:
exception.response['Error']['Message'].should.equal('Cannot delete the default version of a policy')
@mock_iot
def test_things():
client = boto3.client('iot', region_name='ap-northeast-1')