Add some validations for IoT delete operations

fix #1908
This commit is contained in:
Tomoya Iwata 2018-10-30 14:38:59 +09:00
parent 71a054af92
commit 3d71a67794
3 changed files with 138 additions and 1 deletions

View File

@ -31,3 +31,20 @@ class VersionConflictException(IoTClientError):
'VersionConflictException',
'The version for thing %s does not match the expected version.' % name
)
class CertificateStateException(IoTClientError):
def __init__(self, msg, cert_id):
self.code = 406
super(CertificateStateException, self).__init__(
'CertificateStateException',
'%s Id: %s' % (msg, cert_id)
)
class DeleteConflictException(IoTClientError):
def __init__(self, msg):
self.code = 409
super(DeleteConflictException, self).__init__(
'DeleteConflictException', msg
)

View File

@ -13,6 +13,8 @@ import boto3
from moto.core import BaseBackend, BaseModel
from .exceptions import (
CertificateStateException,
DeleteConflictException,
ResourceNotFoundException,
InvalidRequestException,
VersionConflictException
@ -378,7 +380,25 @@ class IoTBackend(BaseBackend):
return certificate, key_pair
def delete_certificate(self, certificate_id):
self.describe_certificate(certificate_id)
cert = self.describe_certificate(certificate_id)
if cert.status == 'ACTIVE':
raise CertificateStateException(
'Certificate must be deactivated (not ACTIVE) before deletion.', certificate_id)
certs = [k[0] for k, v in self.principal_things.items()
if self._get_principal(k[0]).certificate_id == certificate_id]
if len(certs) > 0:
raise DeleteConflictException(
'Things must be detached before deletion (arn: %s)' % certs[0]
)
certs = [k[0] for k, v in self.principal_policies.items()
if self._get_principal(k[0]).certificate_id == certificate_id]
if len(certs) > 0:
raise DeleteConflictException(
'Certificate policies must be detached before deletion (arn: %s)' % certs[0]
)
del self.certificates[certificate_id]
def describe_certificate(self, certificate_id):
@ -411,6 +431,14 @@ class IoTBackend(BaseBackend):
return policies[0]
def delete_policy(self, policy_name):
policies = [k[1] for k, v in self.principal_policies.items() if k[1] == policy_name]
if len(policies) > 0:
raise DeleteConflictException(
'The policy cannot be deleted as the policy is attached to one or more principals (name=%s)'
% policy_name
)
policy = self.get_policy(policy_name)
del self.policies[policy.name]

View File

@ -5,6 +5,8 @@ import sure # noqa
import boto3
from moto import mock_iot
from botocore.exceptions import ClientError
from nose.tools import assert_raises
@mock_iot
@ -261,6 +263,96 @@ def test_certs():
res.should.have.key('certificates').which.should.have.length_of(0)
@mock_iot
def test_delete_policy_validation():
doc = """{
"Version": "2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Action":[
"iot: *"
],
"Resource":"*"
}
]
}
"""
client = boto3.client('iot', region_name='ap-northeast-1')
cert = client.create_keys_and_certificate(setAsActive=True)
cert_arn = cert['certificateArn']
policy_name = 'my-policy'
client.create_policy(policyName=policy_name, policyDocument=doc)
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
with assert_raises(ClientError) as e:
client.delete_policy(policyName=policy_name)
e.exception.response['Error']['Message'].should.contain(
'The policy cannot be deleted as the policy is attached to one or more principals (name=%s)' % policy_name)
res = client.list_policies()
res.should.have.key('policies').which.should.have.length_of(1)
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
client.delete_policy(policyName=policy_name)
res = client.list_policies()
res.should.have.key('policies').which.should.have.length_of(0)
@mock_iot
def test_delete_certificate_validation():
doc = """{
"Version": "2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Action":[
"iot: *"
],
"Resource":"*"
}
]
}
"""
client = boto3.client('iot', region_name='ap-northeast-1')
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert['certificateId']
cert_arn = cert['certificateArn']
policy_name = 'my-policy'
thing_name = 'thing-1'
client.create_policy(policyName=policy_name, policyDocument=doc)
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
client.create_thing(thingName=thing_name)
client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
with assert_raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id)
e.exception.response['Error']['Message'].should.contain(
'Certificate must be deactivated (not ACTIVE) before deletion.')
res = client.list_certificates()
res.should.have.key('certificates').which.should.have.length_of(1)
client.update_certificate(certificateId=cert_id, newStatus='REVOKED')
with assert_raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id)
e.exception.response['Error']['Message'].should.contain(
'Things must be detached before deletion (arn: %s)' % cert_arn)
res = client.list_certificates()
res.should.have.key('certificates').which.should.have.length_of(1)
client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
with assert_raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id)
e.exception.response['Error']['Message'].should.contain(
'Certificate policies must be detached before deletion (arn: %s)' % cert_arn)
res = client.list_certificates()
res.should.have.key('certificates').which.should.have.length_of(1)
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
client.delete_certificate(certificateId=cert_id)
res = client.list_certificates()
res.should.have.key('certificates').which.should.have.length_of(0)
@mock_iot
def test_certs_create_inactive():
client = boto3.client('iot', region_name='ap-northeast-1')