950 lines
38 KiB
950 lines
38 KiB
from __future__ import unicode_literals
import hashlib
import random
import re
import string
import time
import uuid
from collections import OrderedDict
from datetime import datetime
import boto3
from moto.core import BaseBackend, BaseModel
from .exceptions import (
class FakeThing(BaseModel):
def __init__(self, thing_name, thing_type, attributes, region_name):
self.region_name = region_name
self.thing_name = thing_name
self.thing_type = thing_type
self.attributes = attributes
self.arn = 'arn:aws:iot:%s:1:thing/%s' % (self.region_name, thing_name)
self.version = 1
# TODO: we need to handle 'version'?
# for iot-data
self.thing_shadow = None
def to_dict(self, include_default_client_id=False):
obj = {
'thingName': self.thing_name,
'thingArn': self.arn,
'attributes': self.attributes,
'version': self.version
if self.thing_type:
obj['thingTypeName'] = self.thing_type.thing_type_name
if include_default_client_id:
obj['defaultClientId'] = self.thing_name
return obj
class FakeThingType(BaseModel):
def __init__(self, thing_type_name, thing_type_properties, region_name):
self.region_name = region_name
self.thing_type_name = thing_type_name
self.thing_type_properties = thing_type_properties
self.thing_type_id = str(uuid.uuid4()) # I don't know the rule of id
t = time.time()
self.metadata = {
'deprecated': False,
'creationData': int(t * 1000) / 1000.0
self.arn = 'arn:aws:iot:%s:1:thingtype/%s' % (self.region_name, thing_type_name)
def to_dict(self):
return {
'thingTypeName': self.thing_type_name,
'thingTypeId': self.thing_type_id,
'thingTypeProperties': self.thing_type_properties,
'thingTypeMetadata': self.metadata
class FakeThingGroup(BaseModel):
def __init__(self, thing_group_name, parent_group_name, thing_group_properties, region_name):
self.region_name = region_name
self.thing_group_name = thing_group_name
self.thing_group_id = str(uuid.uuid4()) # I don't know the rule of id
self.version = 1 # TODO: tmp
self.parent_group_name = parent_group_name
self.thing_group_properties = thing_group_properties or {}
t = time.time()
self.metadata = {
'creationData': int(t * 1000) / 1000.0
self.arn = 'arn:aws:iot:%s:1:thinggroup/%s' % (self.region_name, thing_group_name)
self.things = OrderedDict()
def to_dict(self):
return {
'thingGroupName': self.thing_group_name,
'thingGroupId': self.thing_group_id,
'version': self.version,
'thingGroupProperties': self.thing_group_properties,
'thingGroupMetadata': self.metadata
class FakeCertificate(BaseModel):
def __init__(self, certificate_pem, status, region_name):
m = hashlib.sha256()
self.certificate_id = m.hexdigest()
self.arn = 'arn:aws:iot:%s:1:cert/%s' % (region_name, self.certificate_id)
self.certificate_pem = certificate_pem
self.status = status
# TODO: must adjust
self.owner = '1'
self.transfer_data = {}
self.creation_date = time.time()
self.last_modified_date = self.creation_date
self.ca_certificate_id = None
def to_dict(self):
return {
'certificateArn': self.arn,
'certificateId': self.certificate_id,
'status': self.status,
'creationDate': self.creation_date
def to_description_dict(self):
You might need keys below in some situation
- caCertificateId
- previousOwnedBy
return {
'certificateArn': self.arn,
'certificateId': self.certificate_id,
'status': self.status,
'certificatePem': self.certificate_pem,
'ownedBy': self.owner,
'creationDate': self.creation_date,
'lastModifiedDate': self.last_modified_date,
'transferData': self.transfer_data
class FakePolicy(BaseModel):
def __init__(self, name, document, region_name, default_version_id='1'):
self.name = name
self.document = document
self.arn = 'arn:aws:iot:%s:1:policy/%s' % (region_name, name)
self.default_version_id = default_version_id
self.versions = [FakePolicyVersion(self.name, document, True, region_name)]
def to_get_dict(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'defaultVersionId': self.default_version_id
def to_dict_at_creation(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.default_version_id
def to_dict(self):
return {
'policyName': self.name,
'policyArn': self.arn,
class FakePolicyVersion(object):
def __init__(self,
self.name = policy_name
self.arn = 'arn:aws:iot:%s:1:policy/%s' % (region_name, policy_name)
self.document = document or {}
self.is_default = is_default
self.version_id = '1'
self.create_datetime = time.mktime(datetime(2015, 1, 1).timetuple())
self.last_modified_datetime = time.mktime(datetime(2015, 1, 2).timetuple())
def to_get_dict(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.version_id,
'isDefaultVersion': self.is_default,
'creationDate': self.create_datetime,
'lastModifiedDate': self.last_modified_datetime,
'generationId': self.version_id
def to_dict_at_creation(self):
return {
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.version_id,
'isDefaultVersion': self.is_default
def to_dict(self):
return {
'versionId': self.version_id,
'isDefaultVersion': self.is_default,
'createDate': self.create_datetime,
class FakeJob(BaseModel):
JOB_ID_REGEX_PATTERN = "[a-zA-Z0-9_-]"
def __init__(self, job_id, targets, document_source, document, description, presigned_url_config, target_selection,
job_executions_rollout_config, document_parameters, region_name):
if not self._job_id_matcher(self.JOB_ID_REGEX, job_id):
raise InvalidRequestException()
self.region_name = region_name
self.job_id = job_id
self.job_arn = 'arn:aws:iot:%s:1:job/%s' % (self.region_name, job_id)
self.targets = targets
self.document_source = document_source
self.document = document
self.force = False
self.description = description
self.presigned_url_config = presigned_url_config
self.target_selection = target_selection
self.job_executions_rollout_config = job_executions_rollout_config
self.comment = None
self.reason_code = None
self.created_at = time.mktime(datetime(2015, 1, 1).timetuple())
self.last_updated_at = time.mktime(datetime(2015, 1, 1).timetuple())
self.completed_at = None
self.job_process_details = {
'processingTargets': targets,
'numberOfQueuedThings': 1,
'numberOfCanceledThings': 0,
'numberOfSucceededThings': 0,
'numberOfFailedThings': 0,
'numberOfRejectedThings': 0,
'numberOfInProgressThings': 0,
'numberOfRemovedThings': 0
self.document_parameters = document_parameters
def to_dict(self):
obj = {
'jobArn': self.job_arn,
'jobId': self.job_id,
'targets': self.targets,
'description': self.description,
'presignedUrlConfig': self.presigned_url_config,
'targetSelection': self.target_selection,
'jobExecutionsRolloutConfig': self.job_executions_rollout_config,
'status': self.status,
'comment': self.comment,
'forceCanceled': self.force,
'reasonCode': self.reason_code,
'createdAt': self.created_at,
'lastUpdatedAt': self.last_updated_at,
'completedAt': self.completed_at,
'jobProcessDetails': self.job_process_details,
'documentParameters': self.document_parameters,
'document': self.document,
'documentSource': self.document_source
return obj
def _job_id_matcher(self, regex, argument):
regex_match = regex.match(argument)
length_match = len(argument) <= 64
return regex_match and length_match
class FakeJobExecution(BaseModel):
def __init__(self, job_id, thing_arn, status='QUEUED', force_canceled=False, status_details_map={}):
self.job_id = job_id
self.status = status # IN_PROGRESS | CANCELED | COMPLETED
self.force_canceled = force_canceled
self.status_details_map = status_details_map
self.thing_arn = thing_arn
self.queued_at = time.mktime(datetime(2015, 1, 1).timetuple())
self.started_at = time.mktime(datetime(2015, 1, 1).timetuple())
self.last_updated_at = time.mktime(datetime(2015, 1, 1).timetuple())
self.execution_number = 123
self.version_number = 123
self.approximate_seconds_before_time_out = 123
def to_get_dict(self):
obj = {
'jobId': self.job_id,
'status': self.status,
'forceCanceled': self.force_canceled,
'statusDetails': {'detailsMap': self.status_details_map},
'thingArn': self.thing_arn,
'queuedAt': self.queued_at,
'startedAt': self.started_at,
'lastUpdatedAt': self.last_updated_at,
'executionNumber': self.execution_number,
'versionNumber': self.version_number,
'approximateSecondsBeforeTimedOut': self.approximate_seconds_before_time_out
return obj
def to_dict(self):
obj = {
'jobId': self.job_id,
'thingArn': self.thing_arn,
'jobExecutionSummary': {
'status': self.status,
'queuedAt': self.queued_at,
'startedAt': self.started_at,
'lastUpdatedAt': self.last_updated_at,
'executionNumber': self.execution_number,
return obj
class IoTBackend(BaseBackend):
def __init__(self, region_name=None):
super(IoTBackend, self).__init__()
self.region_name = region_name
self.things = OrderedDict()
self.jobs = OrderedDict()
self.job_executions = OrderedDict()
self.thing_types = OrderedDict()
self.thing_groups = OrderedDict()
self.certificates = OrderedDict()
self.policies = OrderedDict()
self.principal_policies = OrderedDict()
self.principal_things = OrderedDict()
def reset(self):
region_name = self.region_name
self.__dict__ = {}
def create_thing(self, thing_name, thing_type_name, attribute_payload):
thing_types = self.list_thing_types()
thing_type = None
if thing_type_name:
filtered_thing_types = [_ for _ in thing_types if _.thing_type_name == thing_type_name]
if len(filtered_thing_types) == 0:
raise ResourceNotFoundException()
thing_type = filtered_thing_types[0]
if attribute_payload is None:
attributes = {}
elif 'attributes' not in attribute_payload:
attributes = {}
attributes = attribute_payload['attributes']
thing = FakeThing(thing_name, thing_type, attributes, self.region_name)
self.things[thing.arn] = thing
return thing.thing_name, thing.arn
def create_thing_type(self, thing_type_name, thing_type_properties):
if thing_type_properties is None:
thing_type_properties = {}
thing_type = FakeThingType(thing_type_name, thing_type_properties, self.region_name)
self.thing_types[thing_type.arn] = thing_type
return thing_type.thing_type_name, thing_type.arn
def list_thing_types(self, thing_type_name=None):
if thing_type_name:
# It's weird but thing_type_name is filtered by forward match, not complete match
return [_ for _ in self.thing_types.values() if _.thing_type_name.startswith(thing_type_name)]
return self.thing_types.values()
def list_things(self, attribute_name, attribute_value, thing_type_name, max_results, token):
all_things = [_.to_dict() for _ in self.things.values()]
if attribute_name is not None and thing_type_name is not None:
filtered_things = list(filter(lambda elem:
attribute_name in elem["attributes"] and
elem["attributes"][attribute_name] == attribute_value and
"thingTypeName" in elem and
elem["thingTypeName"] == thing_type_name, all_things))
elif attribute_name is not None and thing_type_name is None:
filtered_things = list(filter(lambda elem:
attribute_name in elem["attributes"] and
elem["attributes"][attribute_name] == attribute_value, all_things))
elif attribute_name is None and thing_type_name is not None:
filtered_things = list(
filter(lambda elem: "thingTypeName" in elem and elem["thingTypeName"] == thing_type_name, all_things))
filtered_things = all_things
if token is None:
things = filtered_things[0:max_results]
next_token = str(max_results) if len(filtered_things) > max_results else None
token = int(token)
things = filtered_things[token:token + max_results]
next_token = str(token + max_results) if len(filtered_things) > token + max_results else None
return things, next_token
def describe_thing(self, thing_name):
things = [_ for _ in self.things.values() if _.thing_name == thing_name]
if len(things) == 0:
raise ResourceNotFoundException()
return things[0]
def describe_thing_type(self, thing_type_name):
thing_types = [_ for _ in self.thing_types.values() if _.thing_type_name == thing_type_name]
if len(thing_types) == 0:
raise ResourceNotFoundException()
return thing_types[0]
def delete_thing(self, thing_name, expected_version):
# TODO: handle expected_version
# can raise ResourceNotFoundError
thing = self.describe_thing(thing_name)
del self.things[thing.arn]
def delete_thing_type(self, thing_type_name):
# can raise ResourceNotFoundError
thing_type = self.describe_thing_type(thing_type_name)
del self.thing_types[thing_type.arn]
def update_thing(self, thing_name, thing_type_name, attribute_payload, expected_version, remove_thing_type):
# if attributes payload = {}, nothing
thing = self.describe_thing(thing_name)
thing_type = None
if remove_thing_type and thing_type_name:
raise InvalidRequestException()
# thing_type
if thing_type_name:
thing_types = self.list_thing_types()
filtered_thing_types = [_ for _ in thing_types if _.thing_type_name == thing_type_name]
if len(filtered_thing_types) == 0:
raise ResourceNotFoundException()
thing_type = filtered_thing_types[0]
thing.thing_type = thing_type
if remove_thing_type:
thing.thing_type = None
# attribute
if attribute_payload is not None and 'attributes' in attribute_payload:
do_merge = attribute_payload.get('merge', False)
attributes = attribute_payload['attributes']
if not do_merge:
thing.attributes = attributes
def _random_string(self):
n = 20
random_str = ''.join([random.choice(string.ascii_letters + string.digits) for i in range(n)])
return random_str
def create_keys_and_certificate(self, set_as_active):
# implement here
# caCertificate can be blank
key_pair = {
'PublicKey': self._random_string(),
'PrivateKey': self._random_string()
certificate_pem = self._random_string()
status = 'ACTIVE' if set_as_active else 'INACTIVE'
certificate = FakeCertificate(certificate_pem, status, self.region_name)
self.certificates[certificate.certificate_id] = certificate
return certificate, key_pair
def delete_certificate(self, certificate_id):
cert = self.describe_certificate(certificate_id)
if cert.status == 'ACTIVE':
raise CertificateStateException(
'Certificate must be deactivated (not ACTIVE) before deletion.', certificate_id)
certs = [k[0] for k, v in self.principal_things.items()
if self._get_principal(k[0]).certificate_id == certificate_id]
if len(certs) > 0:
raise DeleteConflictException(
'Things must be detached before deletion (arn: %s)' % certs[0]
certs = [k[0] for k, v in self.principal_policies.items()
if self._get_principal(k[0]).certificate_id == certificate_id]
if len(certs) > 0:
raise DeleteConflictException(
'Certificate policies must be detached before deletion (arn: %s)' % certs[0]
del self.certificates[certificate_id]
def describe_certificate(self, certificate_id):
certs = [_ for _ in self.certificates.values() if _.certificate_id == certificate_id]
if len(certs) == 0:
raise ResourceNotFoundException()
return certs[0]
def list_certificates(self):
return self.certificates.values()
def update_certificate(self, certificate_id, new_status):
cert = self.describe_certificate(certificate_id)
# TODO: validate new_status
cert.status = new_status
def create_policy(self, policy_name, policy_document):
policy = FakePolicy(policy_name, policy_document, self.region_name)
self.policies[policy.name] = policy
return policy
def attach_policy(self, policy_name, target):
principal = self._get_principal(target)
policy = self.get_policy(policy_name)
k = (target, policy_name)
if k in self.principal_policies:
self.principal_policies[k] = (principal, policy)
def detach_policy(self, policy_name, target):
# this may raises ResourceNotFoundException
k = (target, policy_name)
if k not in self.principal_policies:
raise ResourceNotFoundException()
del self.principal_policies[k]
def list_attached_policies(self, target):
policies = [v[1] for k, v in self.principal_policies.items() if k[0] == target]
return policies
def list_policies(self):
policies = self.policies.values()
return policies
def get_policy(self, policy_name):
policies = [_ for _ in self.policies.values() if _.name == policy_name]
if len(policies) == 0:
raise ResourceNotFoundException()
return policies[0]
def delete_policy(self, policy_name):
policies = [k[1] for k, v in self.principal_policies.items() if k[1] == policy_name]
if len(policies) > 0:
raise DeleteConflictException(
'The policy cannot be deleted as the policy is attached to one or more principals (name=%s)'
% policy_name
policy = self.get_policy(policy_name)
del self.policies[policy.name]
def create_policy_version(self, policy_name, policy_document, set_as_default):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
version = FakePolicyVersion(policy_name, policy_document, set_as_default, self.region_name)
version.version_id = '{0}'.format(len(policy.versions))
if set_as_default:
self.set_default_policy_version(policy_name, version.version_id)
return version
def set_default_policy_version(self, policy_name, version_id):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
for version in policy.versions:
if version.version_id == version_id:
version.is_default = True
policy.default_version_id = version.version_id
policy.document = version.document
version.is_default = False
def get_policy_version(self, policy_name, version_id):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
for version in policy.versions:
if version.version_id == version_id:
return version
raise ResourceNotFoundException()
def list_policy_versions(self, policy_name):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
return policy.versions
def delete_policy_version(self, policy_name, version_id):
policy = self.get_policy(policy_name)
if not policy:
raise ResourceNotFoundException()
if version_id == policy.default_version_id:
raise InvalidRequestException(
"Cannot delete the default version of a policy")
for i, v in enumerate(policy.versions):
if v.version_id == version_id:
del policy.versions[i]
raise ResourceNotFoundException()
def _get_principal(self, principal_arn):
raise ResourceNotFoundException
if ':cert/' in principal_arn:
certs = [_ for _ in self.certificates.values() if _.arn == principal_arn]
if len(certs) == 0:
raise ResourceNotFoundException()
principal = certs[0]
return principal
# TODO: search for cognito_ids
raise ResourceNotFoundException()
def attach_principal_policy(self, policy_name, principal_arn):
principal = self._get_principal(principal_arn)
policy = self.get_policy(policy_name)
k = (principal_arn, policy_name)
if k in self.principal_policies:
self.principal_policies[k] = (principal, policy)
def detach_principal_policy(self, policy_name, principal_arn):
# this may raises ResourceNotFoundException
k = (principal_arn, policy_name)
if k not in self.principal_policies:
raise ResourceNotFoundException()
del self.principal_policies[k]
def list_principal_policies(self, principal_arn):
policies = [v[1] for k, v in self.principal_policies.items() if k[0] == principal_arn]
return policies
def list_policy_principals(self, policy_name):
principals = [k[0] for k, v in self.principal_policies.items() if k[1] == policy_name]
return principals
def attach_thing_principal(self, thing_name, principal_arn):
principal = self._get_principal(principal_arn)
thing = self.describe_thing(thing_name)
k = (principal_arn, thing_name)
if k in self.principal_things:
self.principal_things[k] = (principal, thing)
def detach_thing_principal(self, thing_name, principal_arn):
# this may raises ResourceNotFoundException
k = (principal_arn, thing_name)
if k not in self.principal_things:
raise ResourceNotFoundException()
del self.principal_things[k]
def list_principal_things(self, principal_arn):
thing_names = [k[0] for k, v in self.principal_things.items() if k[0] == principal_arn]
return thing_names
def list_thing_principals(self, thing_name):
principals = [k[0] for k, v in self.principal_things.items() if k[1] == thing_name]
return principals
def describe_thing_group(self, thing_group_name):
thing_groups = [_ for _ in self.thing_groups.values() if _.thing_group_name == thing_group_name]
if len(thing_groups) == 0:
raise ResourceNotFoundException()
return thing_groups[0]
def create_thing_group(self, thing_group_name, parent_group_name, thing_group_properties):
thing_group = FakeThingGroup(thing_group_name, parent_group_name, thing_group_properties, self.region_name)
self.thing_groups[thing_group.arn] = thing_group
return thing_group.thing_group_name, thing_group.arn, thing_group.thing_group_id
def delete_thing_group(self, thing_group_name, expected_version):
thing_group = self.describe_thing_group(thing_group_name)
del self.thing_groups[thing_group.arn]
def list_thing_groups(self, parent_group, name_prefix_filter, recursive):
thing_groups = self.thing_groups.values()
return thing_groups
def update_thing_group(self, thing_group_name, thing_group_properties, expected_version):
thing_group = self.describe_thing_group(thing_group_name)
if expected_version and expected_version != thing_group.version:
raise VersionConflictException(thing_group_name)
attribute_payload = thing_group_properties.get('attributePayload', None)
if attribute_payload is not None and 'attributes' in attribute_payload:
do_merge = attribute_payload.get('merge', False)
attributes = attribute_payload['attributes']
if not do_merge:
thing_group.thing_group_properties['attributePayload']['attributes'] = attributes
elif attribute_payload is not None and 'attributes' not in attribute_payload:
thing_group.attributes = {}
thing_group.version = thing_group.version + 1
return thing_group.version
def _identify_thing_group(self, thing_group_name, thing_group_arn):
# identify thing group
if thing_group_name is None and thing_group_arn is None:
raise InvalidRequestException(
' Both thingGroupArn and thingGroupName are empty. Need to specify at least one of them'
if thing_group_name is not None:
thing_group = self.describe_thing_group(thing_group_name)
if thing_group_arn and thing_group.arn != thing_group_arn:
raise InvalidRequestException(
'ThingGroupName thingGroupArn does not match specified thingGroupName in request'
elif thing_group_arn is not None:
if thing_group_arn not in self.thing_groups:
raise InvalidRequestException()
thing_group = self.thing_groups[thing_group_arn]
return thing_group
def _identify_thing(self, thing_name, thing_arn):
# identify thing
if thing_name is None and thing_arn is None:
raise InvalidRequestException(
'Both thingArn and thingName are empty. Need to specify at least one of them'
if thing_name is not None:
thing = self.describe_thing(thing_name)
if thing_arn and thing.arn != thing_arn:
raise InvalidRequestException(
'ThingName thingArn does not match specified thingName in request'
elif thing_arn is not None:
if thing_arn not in self.things:
raise InvalidRequestException()
thing = self.things[thing_arn]
return thing
def add_thing_to_thing_group(self, thing_group_name, thing_group_arn, thing_name, thing_arn):
thing_group = self._identify_thing_group(thing_group_name, thing_group_arn)
thing = self._identify_thing(thing_name, thing_arn)
if thing.arn in thing_group.things:
# aws ignores duplicate registration
thing_group.things[thing.arn] = thing
def remove_thing_from_thing_group(self, thing_group_name, thing_group_arn, thing_name, thing_arn):
thing_group = self._identify_thing_group(thing_group_name, thing_group_arn)
thing = self._identify_thing(thing_name, thing_arn)
if thing.arn not in thing_group.things:
# aws ignores non-registered thing
del thing_group.things[thing.arn]
def list_things_in_thing_group(self, thing_group_name, recursive):
thing_group = self.describe_thing_group(thing_group_name)
return thing_group.things.values()
def list_thing_groups_for_thing(self, thing_name):
thing = self.describe_thing(thing_name)
all_thing_groups = self.list_thing_groups(None, None, None)
ret = []
for thing_group in all_thing_groups:
if thing.arn in thing_group.things:
'groupName': thing_group.thing_group_name,
'groupArn': thing_group.arn
return ret
def update_thing_groups_for_thing(self, thing_name, thing_groups_to_add, thing_groups_to_remove):
thing = self.describe_thing(thing_name)
for thing_group_name in thing_groups_to_add:
thing_group = self.describe_thing_group(thing_group_name)
thing_group.thing_group_name, None,
thing.thing_name, None
for thing_group_name in thing_groups_to_remove:
thing_group = self.describe_thing_group(thing_group_name)
thing_group.thing_group_name, None,
thing.thing_name, None
def create_job(self, job_id, targets, document_source, document, description, presigned_url_config,
target_selection, job_executions_rollout_config, document_parameters):
job = FakeJob(job_id, targets, document_source, document, description, presigned_url_config, target_selection,
job_executions_rollout_config, document_parameters, self.region_name)
self.jobs[job_id] = job
for thing_arn in targets:
thing_name = thing_arn.split(':')[-1].split('/')[-1]
job_execution = FakeJobExecution(job_id, thing_arn)
self.job_executions[(job_id, thing_name)] = job_execution
return job.job_arn, job_id, description
def describe_job(self, job_id):
jobs = [_ for _ in self.jobs.values() if _.job_id == job_id]
if len(jobs) == 0:
raise ResourceNotFoundException()
return jobs[0]
def delete_job(self, job_id, force):
job = self.jobs[job_id]
if job.status == 'IN_PROGRESS' and force:
del self.jobs[job_id]
elif job.status != 'IN_PROGRESS':
del self.jobs[job_id]
raise InvalidStateTransitionException()
def cancel_job(self, job_id, reason_code, comment, force):
job = self.jobs[job_id]
job.reason_code = reason_code if reason_code is not None else job.reason_code
job.comment = comment if comment is not None else job.comment
job.force = force if force is not None and force != job.force else job.force
job.status = 'CANCELED'
if job.status == 'IN_PROGRESS' and force:
self.jobs[job_id] = job
elif job.status != 'IN_PROGRESS':
self.jobs[job_id] = job
raise InvalidStateTransitionException()
return job
def get_job_document(self, job_id):
return self.jobs[job_id]
def list_jobs(self, status, target_selection, max_results, token, thing_group_name, thing_group_id):
# TODO: implement filters
all_jobs = [_.to_dict() for _ in self.jobs.values()]
filtered_jobs = all_jobs
if token is None:
jobs = filtered_jobs[0:max_results]
next_token = str(max_results) if len(filtered_jobs) > max_results else None
token = int(token)
jobs = filtered_jobs[token:token + max_results]
next_token = str(token + max_results) if len(filtered_jobs) > token + max_results else None
return jobs, next_token
def describe_job_execution(self, job_id, thing_name, execution_number):
job_execution = self.job_executions[(job_id, thing_name)]
except KeyError:
raise ResourceNotFoundException()
if job_execution is None or \
(execution_number is not None and job_execution.execution_number != execution_number):
raise ResourceNotFoundException()
return job_execution
def cancel_job_execution(self, job_id, thing_name, force, expected_version, status_details):
job_execution = self.job_executions[(job_id, thing_name)]
if job_execution is None:
raise ResourceNotFoundException()
job_execution.force_canceled = force if force is not None else job_execution.force_canceled
# TODO: implement expected_version and status_details (at most 10 can be specified)
if job_execution.status == 'IN_PROGRESS' and force:
job_execution.status = 'CANCELED'
self.job_executions[(job_id, thing_name)] = job_execution
elif job_execution.status != 'IN_PROGRESS':
job_execution.status = 'CANCELED'
self.job_executions[(job_id, thing_name)] = job_execution
raise InvalidStateTransitionException()
def delete_job_execution(self, job_id, thing_name, execution_number, force):
job_execution = self.job_executions[(job_id, thing_name)]
if job_execution.execution_number != execution_number:
raise ResourceNotFoundException()
if job_execution.status == 'IN_PROGRESS' and force:
del self.job_executions[(job_id, thing_name)]
elif job_execution.status != 'IN_PROGRESS':
del self.job_executions[(job_id, thing_name)]
raise InvalidStateTransitionException()
def list_job_executions_for_job(self, job_id, status, max_results, next_token):
job_executions = [self.job_executions[je].to_dict() for je in self.job_executions if je[0] == job_id]
if status is not None:
job_executions = list(filter(lambda elem:
status in elem["status"] and
elem["status"] == status, job_executions))
token = next_token
if token is None:
job_executions = job_executions[0:max_results]
next_token = str(max_results) if len(job_executions) > max_results else None
token = int(token)
job_executions = job_executions[token:token + max_results]
next_token = str(token + max_results) if len(job_executions) > token + max_results else None
return job_executions, next_token
def list_job_executions_for_thing(self, thing_name, status, max_results, next_token):
job_executions = [self.job_executions[je].to_dict() for je in self.job_executions if je[1] == thing_name]
if status is not None:
job_executions = list(filter(lambda elem:
status in elem["status"] and
elem["status"] == status, job_executions))
token = next_token
if token is None:
job_executions = job_executions[0:max_results]
next_token = str(max_results) if len(job_executions) > max_results else None
token = int(token)
job_executions = job_executions[token:token + max_results]
next_token = str(token + max_results) if len(job_executions) > token + max_results else None
return job_executions, next_token
available_regions = boto3.session.Session().get_available_regions("iot")
iot_backends = {region: IoTBackend(region) for region in available_regions}