updaated
This commit is contained in:
parent
5804441d38
commit
498419462d
9
file.tmp
Normal file
9
file.tmp
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
|
||||||
|
AWSTemplateFormatVersion: '2010-09-09'
|
||||||
|
Description: Simple CloudFormation Test Template
|
||||||
|
Resources:
|
||||||
|
S3Bucket:
|
||||||
|
Type: AWS::S3::Bucket
|
||||||
|
Properties:
|
||||||
|
AccessControl: PublicRead
|
||||||
|
BucketName: cf-test-bucket-1
|
@ -24,6 +24,15 @@ class InvalidRequestException(IoTClientError):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidStateTransitionException(IoTClientError):
|
||||||
|
def __init__(self, msg=None):
|
||||||
|
self.code = 409
|
||||||
|
super(InvalidStateTransitionException, self).__init__(
|
||||||
|
"InvalidStateTransitionException",
|
||||||
|
msg or "An attempt was made to change to an invalid state."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class VersionConflictException(IoTClientError):
|
class VersionConflictException(IoTClientError):
|
||||||
def __init__(self, name):
|
def __init__(self, name):
|
||||||
self.code = 409
|
self.code = 409
|
||||||
@ -47,4 +56,4 @@ class DeleteConflictException(IoTClientError):
|
|||||||
self.code = 409
|
self.code = 409
|
||||||
super(DeleteConflictException, self).__init__(
|
super(DeleteConflictException, self).__init__(
|
||||||
'DeleteConflictException', msg
|
'DeleteConflictException', msg
|
||||||
)
|
)
|
||||||
|
@ -13,6 +13,8 @@ import boto3
|
|||||||
|
|
||||||
from moto.core import BaseBackend, BaseModel
|
from moto.core import BaseBackend, BaseModel
|
||||||
from .exceptions import (
|
from .exceptions import (
|
||||||
|
CertificateStateException,
|
||||||
|
DeleteConflictException,
|
||||||
ResourceNotFoundException,
|
ResourceNotFoundException,
|
||||||
InvalidRequestException,
|
InvalidRequestException,
|
||||||
InvalidStateTransitionException,
|
InvalidStateTransitionException,
|
||||||
@ -226,14 +228,12 @@ class FakeJob(BaseModel):
|
|||||||
self.targets = targets
|
self.targets = targets
|
||||||
self.document_source = document_source
|
self.document_source = document_source
|
||||||
self.document = document
|
self.document = document
|
||||||
self.force = False
|
|
||||||
self.description = description
|
self.description = description
|
||||||
self.presigned_url_config = presigned_url_config
|
self.presigned_url_config = presigned_url_config
|
||||||
self.target_selection = target_selection
|
self.target_selection = target_selection
|
||||||
self.job_executions_rollout_config = job_executions_rollout_config
|
self.job_executions_rollout_config = job_executions_rollout_config
|
||||||
self.status = 'QUEUED' # IN_PROGRESS | CANCELED | COMPLETED
|
self.status = 'QUEUED' # IN_PROGRESS | CANCELED | COMPLETED
|
||||||
self.comment = None
|
self.comment = None
|
||||||
self.reason_code = None
|
|
||||||
self.created_at = time.mktime(datetime(2015, 1, 1).timetuple())
|
self.created_at = time.mktime(datetime(2015, 1, 1).timetuple())
|
||||||
self.last_updated_at = time.mktime(datetime(2015, 1, 1).timetuple())
|
self.last_updated_at = time.mktime(datetime(2015, 1, 1).timetuple())
|
||||||
self.completed_at = None
|
self.completed_at = None
|
||||||
@ -260,11 +260,9 @@ class FakeJob(BaseModel):
|
|||||||
'jobExecutionsRolloutConfig': self.job_executions_rollout_config,
|
'jobExecutionsRolloutConfig': self.job_executions_rollout_config,
|
||||||
'status': self.status,
|
'status': self.status,
|
||||||
'comment': self.comment,
|
'comment': self.comment,
|
||||||
'forceCanceled': self.force,
|
|
||||||
'reasonCode': self.reason_code,
|
|
||||||
'createdAt': self.created_at,
|
'createdAt': self.created_at,
|
||||||
'lastUpdatedAt': self.last_updated_at,
|
'lastUpdatedAt': self.last_updated_at,
|
||||||
'completedAt': self.completed_at,
|
'completedAt': self.completedAt,
|
||||||
'jobProcessDetails': self.job_process_details,
|
'jobProcessDetails': self.job_process_details,
|
||||||
'documentParameters': self.document_parameters,
|
'documentParameters': self.document_parameters,
|
||||||
'document': self.document,
|
'document': self.document,
|
||||||
@ -477,7 +475,25 @@ class IoTBackend(BaseBackend):
|
|||||||
return certificate, key_pair
|
return certificate, key_pair
|
||||||
|
|
||||||
def delete_certificate(self, certificate_id):
|
def delete_certificate(self, certificate_id):
|
||||||
self.describe_certificate(certificate_id)
|
cert = self.describe_certificate(certificate_id)
|
||||||
|
if cert.status == 'ACTIVE':
|
||||||
|
raise CertificateStateException(
|
||||||
|
'Certificate must be deactivated (not ACTIVE) before deletion.', certificate_id)
|
||||||
|
|
||||||
|
certs = [k[0] for k, v in self.principal_things.items()
|
||||||
|
if self._get_principal(k[0]).certificate_id == certificate_id]
|
||||||
|
if len(certs) > 0:
|
||||||
|
raise DeleteConflictException(
|
||||||
|
'Things must be detached before deletion (arn: %s)' % certs[0]
|
||||||
|
)
|
||||||
|
|
||||||
|
certs = [k[0] for k, v in self.principal_policies.items()
|
||||||
|
if self._get_principal(k[0]).certificate_id == certificate_id]
|
||||||
|
if len(certs) > 0:
|
||||||
|
raise DeleteConflictException(
|
||||||
|
'Certificate policies must be detached before deletion (arn: %s)' % certs[0]
|
||||||
|
)
|
||||||
|
|
||||||
del self.certificates[certificate_id]
|
del self.certificates[certificate_id]
|
||||||
|
|
||||||
def describe_certificate(self, certificate_id):
|
def describe_certificate(self, certificate_id):
|
||||||
@ -532,6 +548,14 @@ class IoTBackend(BaseBackend):
|
|||||||
return policies[0]
|
return policies[0]
|
||||||
|
|
||||||
def delete_policy(self, policy_name):
|
def delete_policy(self, policy_name):
|
||||||
|
|
||||||
|
policies = [k[1] for k, v in self.principal_policies.items() if k[1] == policy_name]
|
||||||
|
if len(policies) > 0:
|
||||||
|
raise DeleteConflictException(
|
||||||
|
'The policy cannot be deleted as the policy is attached to one or more principals (name=%s)'
|
||||||
|
% policy_name
|
||||||
|
)
|
||||||
|
|
||||||
policy = self.get_policy(policy_name)
|
policy = self.get_policy(policy_name)
|
||||||
del self.policies[policy.name]
|
del self.policies[policy.name]
|
||||||
|
|
||||||
@ -601,6 +625,14 @@ class IoTBackend(BaseBackend):
|
|||||||
pass
|
pass
|
||||||
raise ResourceNotFoundException()
|
raise ResourceNotFoundException()
|
||||||
|
|
||||||
|
def attach_policy(self, policy_name, target):
|
||||||
|
principal = self._get_principal(target)
|
||||||
|
policy = self.get_policy(policy_name)
|
||||||
|
k = (target, policy_name)
|
||||||
|
if k in self.principal_policies:
|
||||||
|
return
|
||||||
|
self.principal_policies[k] = (principal, policy)
|
||||||
|
|
||||||
def attach_principal_policy(self, policy_name, principal_arn):
|
def attach_principal_policy(self, policy_name, principal_arn):
|
||||||
principal = self._get_principal(principal_arn)
|
principal = self._get_principal(principal_arn)
|
||||||
policy = self.get_policy(policy_name)
|
policy = self.get_policy(policy_name)
|
||||||
@ -609,6 +641,15 @@ class IoTBackend(BaseBackend):
|
|||||||
return
|
return
|
||||||
self.principal_policies[k] = (principal, policy)
|
self.principal_policies[k] = (principal, policy)
|
||||||
|
|
||||||
|
def detach_policy(self, policy_name, target):
|
||||||
|
# this may raises ResourceNotFoundException
|
||||||
|
self._get_principal(target)
|
||||||
|
self.get_policy(policy_name)
|
||||||
|
k = (target, policy_name)
|
||||||
|
if k not in self.principal_policies:
|
||||||
|
raise ResourceNotFoundException()
|
||||||
|
del self.principal_policies[k]
|
||||||
|
|
||||||
def detach_principal_policy(self, policy_name, principal_arn):
|
def detach_principal_policy(self, policy_name, principal_arn):
|
||||||
# this may raises ResourceNotFoundException
|
# this may raises ResourceNotFoundException
|
||||||
self._get_principal(principal_arn)
|
self._get_principal(principal_arn)
|
||||||
@ -820,102 +861,6 @@ class IoTBackend(BaseBackend):
|
|||||||
def get_job_document(self, job_id):
|
def get_job_document(self, job_id):
|
||||||
return self.jobs[job_id]
|
return self.jobs[job_id]
|
||||||
|
|
||||||
def list_jobs(self, status, target_selection, max_results, token, thing_group_name, thing_group_id):
|
|
||||||
# TODO: implement filters
|
|
||||||
all_jobs = [_.to_dict() for _ in self.jobs.values()]
|
|
||||||
filtered_jobs = all_jobs
|
|
||||||
|
|
||||||
if token is None:
|
|
||||||
jobs = filtered_jobs[0:max_results]
|
|
||||||
next_token = str(max_results) if len(filtered_jobs) > max_results else None
|
|
||||||
else:
|
|
||||||
token = int(token)
|
|
||||||
jobs = filtered_jobs[token:token + max_results]
|
|
||||||
next_token = str(token + max_results) if len(filtered_jobs) > token + max_results else None
|
|
||||||
|
|
||||||
return jobs, next_token
|
|
||||||
|
|
||||||
def describe_job_execution(self, job_id, thing_name, execution_number):
|
|
||||||
try:
|
|
||||||
job_execution = self.job_executions[(job_id, thing_name)]
|
|
||||||
except KeyError:
|
|
||||||
raise ResourceNotFoundException()
|
|
||||||
|
|
||||||
if job_execution is None or \
|
|
||||||
(execution_number is not None and job_execution.execution_number != execution_number):
|
|
||||||
raise ResourceNotFoundException()
|
|
||||||
|
|
||||||
return job_execution
|
|
||||||
|
|
||||||
def cancel_job_execution(self, job_id, thing_name, force, expected_version, status_details):
|
|
||||||
job_execution = self.job_executions[(job_id, thing_name)]
|
|
||||||
|
|
||||||
if job_execution is None:
|
|
||||||
raise ResourceNotFoundException()
|
|
||||||
|
|
||||||
job_execution.force_canceled = force if force is not None else job_execution.force_canceled
|
|
||||||
# TODO: implement expected_version and status_details (at most 10 can be specified)
|
|
||||||
|
|
||||||
if job_execution.status == 'IN_PROGRESS' and force:
|
|
||||||
job_execution.status = 'CANCELED'
|
|
||||||
self.job_executions[(job_id, thing_name)] = job_execution
|
|
||||||
elif job_execution.status != 'IN_PROGRESS':
|
|
||||||
job_execution.status = 'CANCELED'
|
|
||||||
self.job_executions[(job_id, thing_name)] = job_execution
|
|
||||||
else:
|
|
||||||
raise InvalidStateTransitionException()
|
|
||||||
|
|
||||||
def delete_job_execution(self, job_id, thing_name, execution_number, force):
|
|
||||||
job_execution = self.job_executions[(job_id, thing_name)]
|
|
||||||
|
|
||||||
if job_execution.execution_number != execution_number:
|
|
||||||
raise ResourceNotFoundException()
|
|
||||||
|
|
||||||
if job_execution.status == 'IN_PROGRESS' and force:
|
|
||||||
del self.job_executions[(job_id, thing_name)]
|
|
||||||
elif job_execution.status != 'IN_PROGRESS':
|
|
||||||
del self.job_executions[(job_id, thing_name)]
|
|
||||||
else:
|
|
||||||
raise InvalidStateTransitionException()
|
|
||||||
|
|
||||||
def list_job_executions_for_job(self, job_id, status, max_results, next_token):
|
|
||||||
job_executions = [self.job_executions[je].to_dict() for je in self.job_executions if je[0] == job_id]
|
|
||||||
|
|
||||||
if status is not None:
|
|
||||||
job_executions = list(filter(lambda elem:
|
|
||||||
status in elem["status"] and
|
|
||||||
elem["status"] == status, job_executions))
|
|
||||||
|
|
||||||
token = next_token
|
|
||||||
if token is None:
|
|
||||||
job_executions = job_executions[0:max_results]
|
|
||||||
next_token = str(max_results) if len(job_executions) > max_results else None
|
|
||||||
else:
|
|
||||||
token = int(token)
|
|
||||||
job_executions = job_executions[token:token + max_results]
|
|
||||||
next_token = str(token + max_results) if len(job_executions) > token + max_results else None
|
|
||||||
|
|
||||||
return job_executions, next_token
|
|
||||||
|
|
||||||
def list_job_executions_for_thing(self, thing_name, status, max_results, next_token):
|
|
||||||
job_executions = [self.job_executions[je].to_dict() for je in self.job_executions if je[1] == thing_name]
|
|
||||||
|
|
||||||
if status is not None:
|
|
||||||
job_executions = list(filter(lambda elem:
|
|
||||||
status in elem["status"] and
|
|
||||||
elem["status"] == status, job_executions))
|
|
||||||
|
|
||||||
token = next_token
|
|
||||||
if token is None:
|
|
||||||
job_executions = job_executions[0:max_results]
|
|
||||||
next_token = str(max_results) if len(job_executions) > max_results else None
|
|
||||||
else:
|
|
||||||
token = int(token)
|
|
||||||
job_executions = job_executions[token:token + max_results]
|
|
||||||
next_token = str(token + max_results) if len(job_executions) > token + max_results else None
|
|
||||||
|
|
||||||
return job_executions, next_token
|
|
||||||
|
|
||||||
|
|
||||||
available_regions = boto3.session.Session().get_available_regions("iot")
|
available_regions = boto3.session.Session().get_available_regions("iot")
|
||||||
iot_backends = {region: IoTBackend(region) for region in available_regions}
|
iot_backends = {region: IoTBackend(region) for region in available_regions}
|
||||||
|
@ -115,23 +115,39 @@ class IoTResponse(BaseResponse):
|
|||||||
return json.dumps(dict())
|
return json.dumps(dict())
|
||||||
|
|
||||||
def create_job(self):
|
def create_job(self):
|
||||||
job = self.iot_backend.create_job(
|
job_arn, job_id, description = self.iot_backend.create_job(
|
||||||
job_id=self._get_param("jobId"),
|
job_id=self._get_param("jobId"),
|
||||||
targets=self._get_param("targets"),
|
targets=self._get_param("targets"),
|
||||||
description=self._get_param("description"),
|
description=self._get_param("description"),
|
||||||
document_source=self._get_param("documentSource"),
|
document_source=self._get_param("documentSource"),
|
||||||
document=self._get_param("document"),
|
document=self._get_param("document"),
|
||||||
presigned_url_config=self._get_param("presignedUrlConfig"),
|
presigned_url_config=self._get_param("presignedUrlConfig"), target_selection=self._get_param("targetSelection"),
|
||||||
target_selection=self._get_param("targetSelection"),
|
|
||||||
job_executions_rollout_config=self._get_param("jobExecutionsRolloutConfig"),
|
job_executions_rollout_config=self._get_param("jobExecutionsRolloutConfig"),
|
||||||
document_parameters=self._get_param("documentParameters")
|
document_parameters=self._get_param("documentParameters")
|
||||||
)
|
)
|
||||||
|
|
||||||
return json.dumps(job.to_dict())
|
return json.dumps(dict(jobArn=job_arn, jobId=job_id, description=description))
|
||||||
|
|
||||||
def describe_job(self):
|
def describe_job(self):
|
||||||
job = self.iot_backend.describe_job(job_id=self._get_param("jobId"))
|
job = self.iot_backend.describe_job(job_id=self._get_param("jobId"))
|
||||||
return json.dumps(dict(documentSource=job.document_source, job=job.to_dict()))
|
return json.dumps(dict(
|
||||||
|
documentSource=job.document_source,
|
||||||
|
job=dict(
|
||||||
|
comment=job.comment,
|
||||||
|
completedAt=job.completed_at,
|
||||||
|
createdAt=job.created_at,
|
||||||
|
description=job.description,
|
||||||
|
documentParameters=job.document_parameters,
|
||||||
|
jobArn=job.job_arn,
|
||||||
|
jobExecutionsRolloutConfig=job.job_executions_rollout_config,
|
||||||
|
jobId=job.job_id,
|
||||||
|
jobProcessDetails=job.job_process_details,
|
||||||
|
lastUpdatedAt=job.last_updated_at,
|
||||||
|
presignedUrlConfig=job.presigned_url_config,
|
||||||
|
status=job.status,
|
||||||
|
targets=job.targets,
|
||||||
|
targetSelection=job.target_selection
|
||||||
|
)))
|
||||||
|
|
||||||
def delete_job(self):
|
def delete_job(self):
|
||||||
job_id = self._get_param("jobId")
|
job_id = self._get_param("jobId")
|
||||||
@ -140,8 +156,6 @@ class IoTResponse(BaseResponse):
|
|||||||
self.iot_backend.delete_job(job_id=job_id,
|
self.iot_backend.delete_job(job_id=job_id,
|
||||||
force=force)
|
force=force)
|
||||||
|
|
||||||
return json.dumps(dict())
|
|
||||||
|
|
||||||
def cancel_job(self):
|
def cancel_job(self):
|
||||||
job_id = self._get_param("jobId")
|
job_id = self._get_param("jobId")
|
||||||
reason_code = self._get_param("reasonCode")
|
reason_code = self._get_param("reasonCode")
|
||||||
@ -354,19 +368,10 @@ class IoTResponse(BaseResponse):
|
|||||||
|
|
||||||
def attach_policy(self):
|
def attach_policy(self):
|
||||||
policy_name = self._get_param("policyName")
|
policy_name = self._get_param("policyName")
|
||||||
principal = self._get_param('target')
|
target = self._get_param('target')
|
||||||
self.iot_backend.attach_policy(
|
self.iot_backend.attach_policy(
|
||||||
policy_name=policy_name,
|
policy_name=policy_name,
|
||||||
target=principal,
|
target=target,
|
||||||
)
|
|
||||||
return json.dumps(dict())
|
|
||||||
|
|
||||||
def detach_policy(self):
|
|
||||||
policy_name = self._get_param("policyName")
|
|
||||||
principal = self._get_param('target')
|
|
||||||
self.iot_backend.detach_policy(
|
|
||||||
policy_name=policy_name,
|
|
||||||
target=principal,
|
|
||||||
)
|
)
|
||||||
return json.dumps(dict())
|
return json.dumps(dict())
|
||||||
|
|
||||||
@ -390,6 +395,15 @@ class IoTResponse(BaseResponse):
|
|||||||
)
|
)
|
||||||
return json.dumps(dict())
|
return json.dumps(dict())
|
||||||
|
|
||||||
|
def detach_policy(self):
|
||||||
|
policy_name = self._get_param("policyName")
|
||||||
|
target = self._get_param('target')
|
||||||
|
self.iot_backend.detach_policy(
|
||||||
|
policy_name=policy_name,
|
||||||
|
target=target,
|
||||||
|
)
|
||||||
|
return json.dumps(dict())
|
||||||
|
|
||||||
def detach_principal_policy(self):
|
def detach_principal_policy(self):
|
||||||
policy_name = self._get_param("policyName")
|
policy_name = self._get_param("policyName")
|
||||||
principal = self.headers.get('x-amzn-iot-principal')
|
principal = self.headers.get('x-amzn-iot-principal')
|
||||||
|
@ -4,8 +4,9 @@ import json
|
|||||||
import sure #noqa
|
import sure #noqa
|
||||||
import boto3
|
import boto3
|
||||||
|
|
||||||
from botocore.exceptions import ClientError
|
|
||||||
from moto import mock_iot
|
from moto import mock_iot
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
from nose.tools import assert_raises
|
||||||
|
|
||||||
@mock_iot
|
@mock_iot
|
||||||
def test_attach_policy():
|
def test_attach_policy():
|
||||||
@ -384,6 +385,96 @@ def test_certs():
|
|||||||
res.should.have.key('certificates').which.should.have.length_of(0)
|
res.should.have.key('certificates').which.should.have.length_of(0)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iot
|
||||||
|
def test_delete_policy_validation():
|
||||||
|
doc = """{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement":[
|
||||||
|
{
|
||||||
|
"Effect":"Allow",
|
||||||
|
"Action":[
|
||||||
|
"iot: *"
|
||||||
|
],
|
||||||
|
"Resource":"*"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||||||
|
cert_arn = cert['certificateArn']
|
||||||
|
policy_name = 'my-policy'
|
||||||
|
client.create_policy(policyName=policy_name, policyDocument=doc)
|
||||||
|
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||||
|
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.delete_policy(policyName=policy_name)
|
||||||
|
e.exception.response['Error']['Message'].should.contain(
|
||||||
|
'The policy cannot be deleted as the policy is attached to one or more principals (name=%s)' % policy_name)
|
||||||
|
res = client.list_policies()
|
||||||
|
res.should.have.key('policies').which.should.have.length_of(1)
|
||||||
|
|
||||||
|
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||||
|
client.delete_policy(policyName=policy_name)
|
||||||
|
res = client.list_policies()
|
||||||
|
res.should.have.key('policies').which.should.have.length_of(0)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iot
|
||||||
|
def test_delete_certificate_validation():
|
||||||
|
doc = """{
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement":[
|
||||||
|
{
|
||||||
|
"Effect":"Allow",
|
||||||
|
"Action":[
|
||||||
|
"iot: *"
|
||||||
|
],
|
||||||
|
"Resource":"*"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||||||
|
cert_id = cert['certificateId']
|
||||||
|
cert_arn = cert['certificateArn']
|
||||||
|
policy_name = 'my-policy'
|
||||||
|
thing_name = 'thing-1'
|
||||||
|
client.create_policy(policyName=policy_name, policyDocument=doc)
|
||||||
|
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||||
|
client.create_thing(thingName=thing_name)
|
||||||
|
client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||||||
|
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.delete_certificate(certificateId=cert_id)
|
||||||
|
e.exception.response['Error']['Message'].should.contain(
|
||||||
|
'Certificate must be deactivated (not ACTIVE) before deletion.')
|
||||||
|
res = client.list_certificates()
|
||||||
|
res.should.have.key('certificates').which.should.have.length_of(1)
|
||||||
|
|
||||||
|
client.update_certificate(certificateId=cert_id, newStatus='REVOKED')
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.delete_certificate(certificateId=cert_id)
|
||||||
|
e.exception.response['Error']['Message'].should.contain(
|
||||||
|
'Things must be detached before deletion (arn: %s)' % cert_arn)
|
||||||
|
res = client.list_certificates()
|
||||||
|
res.should.have.key('certificates').which.should.have.length_of(1)
|
||||||
|
|
||||||
|
client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.delete_certificate(certificateId=cert_id)
|
||||||
|
e.exception.response['Error']['Message'].should.contain(
|
||||||
|
'Certificate policies must be detached before deletion (arn: %s)' % cert_arn)
|
||||||
|
res = client.list_certificates()
|
||||||
|
res.should.have.key('certificates').which.should.have.length_of(1)
|
||||||
|
|
||||||
|
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||||||
|
client.delete_certificate(certificateId=cert_id)
|
||||||
|
res = client.list_certificates()
|
||||||
|
res.should.have.key('certificates').which.should.have.length_of(0)
|
||||||
|
|
||||||
|
|
||||||
@mock_iot
|
@mock_iot
|
||||||
def test_certs_create_inactive():
|
def test_certs_create_inactive():
|
||||||
client = boto3.client('iot', region_name='ap-northeast-1')
|
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||||
@ -432,6 +523,47 @@ def test_policy():
|
|||||||
|
|
||||||
@mock_iot
|
@mock_iot
|
||||||
def test_principal_policy():
|
def test_principal_policy():
|
||||||
|
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||||
|
policy_name = 'my-policy'
|
||||||
|
doc = '{}'
|
||||||
|
client.create_policy(policyName=policy_name, policyDocument=doc)
|
||||||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||||||
|
cert_arn = cert['certificateArn']
|
||||||
|
|
||||||
|
client.attach_policy(policyName=policy_name, target=cert_arn)
|
||||||
|
|
||||||
|
res = client.list_principal_policies(principal=cert_arn)
|
||||||
|
res.should.have.key('policies').which.should.have.length_of(1)
|
||||||
|
for policy in res['policies']:
|
||||||
|
policy.should.have.key('policyName').which.should_not.be.none
|
||||||
|
policy.should.have.key('policyArn').which.should_not.be.none
|
||||||
|
|
||||||
|
# do nothing if policy have already attached to certificate
|
||||||
|
client.attach_policy(policyName=policy_name, target=cert_arn)
|
||||||
|
|
||||||
|
res = client.list_principal_policies(principal=cert_arn)
|
||||||
|
res.should.have.key('policies').which.should.have.length_of(1)
|
||||||
|
for policy in res['policies']:
|
||||||
|
policy.should.have.key('policyName').which.should_not.be.none
|
||||||
|
policy.should.have.key('policyArn').which.should_not.be.none
|
||||||
|
|
||||||
|
res = client.list_policy_principals(policyName=policy_name)
|
||||||
|
res.should.have.key('principals').which.should.have.length_of(1)
|
||||||
|
for principal in res['principals']:
|
||||||
|
principal.should_not.be.none
|
||||||
|
|
||||||
|
client.detach_policy(policyName=policy_name, target=cert_arn)
|
||||||
|
res = client.list_principal_policies(principal=cert_arn)
|
||||||
|
res.should.have.key('policies').which.should.have.length_of(0)
|
||||||
|
res = client.list_policy_principals(policyName=policy_name)
|
||||||
|
res.should.have.key('principals').which.should.have.length_of(0)
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.detach_policy(policyName=policy_name, target=cert_arn)
|
||||||
|
e.exception.response['Error']['Code'].should.equal('ResourceNotFoundException')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iot
|
||||||
|
def test_principal_policy_deprecated():
|
||||||
client = boto3.client('iot', region_name='ap-northeast-1')
|
client = boto3.client('iot', region_name='ap-northeast-1')
|
||||||
policy_name = 'my-policy'
|
policy_name = 'my-policy'
|
||||||
doc = '{}'
|
doc = '{}'
|
||||||
|
Loading…
x
Reference in New Issue
Block a user