Update models.py

This commit is contained in:
Stephan Huber 2019-12-23 08:46:37 +01:00
parent 0527e88d46
commit a6aa0f6dbf

View File

@ -30,7 +30,7 @@ class FakeThing(BaseModel):
self.attributes = attributes
self.arn = "arn:aws:iot:%s:1:thing/%s" % (self.region_name, thing_name)
self.version = 1
# TODO: we need to handle 'version'?
# TODO: we need to handle "version"?
# for iot-data
self.thing_shadow = None
@ -97,7 +97,7 @@ class FakeThingGroup(BaseModel):
break
# if parent arn found (should always be found)
if parent_thing_group_structure:
# copy parent's rootToParentThingGroups
# copy parent"s rootToParentThingGroups
if "rootToParentThingGroups" in parent_thing_group_structure.metadata:
self.metadata["rootToParentThingGroups"].extend(
parent_thing_group_structure.metadata["rootToParentThingGroups"]
@ -175,27 +175,27 @@ class FakeCertificate(BaseModel):
class FakePolicy(BaseModel):
def __init__(self, name, document, region_name, default_version_id='1'):
def __init__(self, name, document, region_name, default_version_id="1"):
self.name = name
self.document = document
self.arn = 'arn:aws:iot:%s:1:policy/%s' % (region_name, name)
self.arn = "arn:aws:iot:%s:1:policy/%s" % (region_name, name)
self.default_version_id = default_version_id
self.versions = [FakePolicyVersion(self.name, document, True, region_name)]
def to_get_dict(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'defaultVersionId': self.default_version_id
"policyName": self.name,
"policyArn": self.arn,
"policyDocument": self.document,
"defaultVersionId": self.default_version_id
}
def to_dict_at_creation(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.default_version_id
"policyName": self.name,
"policyArn": self.arn,
"policyDocument": self.document,
"policyVersionId": self.default_version_id
}
def to_dict(self):
@ -210,39 +210,39 @@ class FakePolicyVersion(object):
is_default,
region_name):
self.name = policy_name
self.arn = 'arn:aws:iot:%s:1:policy/%s' % (region_name, policy_name)
self.arn = "arn:aws:iot:%s:1:policy/%s" % (region_name, policy_name)
self.document = document or {}
self.is_default = is_default
self.version_id = '1'
self.version_id = "1"
self.create_datetime = time.mktime(datetime(2015, 1, 1).timetuple())
self.last_modified_datetime = time.mktime(datetime(2015, 1, 2).timetuple())
def to_get_dict(self):
return {
'policyName': self.name,
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.version_id,
'isDefaultVersion': self.is_default,
'creationDate': self.create_datetime,
'lastModifiedDate': self.last_modified_datetime,
'generationId': self.version_id
"policyName": self.name,
"policyArn": self.arn,
"policyDocument": self.document,
"policyVersionId": self.version_id,
"isDefaultVersion": self.is_default,
"creationDate": self.create_datetime,
"lastModifiedDate": self.last_modified_datetime,
"generationId": self.version_id
}
def to_dict_at_creation(self):
return {
'policyArn': self.arn,
'policyDocument': self.document,
'policyVersionId': self.version_id,
'isDefaultVersion': self.is_default
"policyArn": self.arn,
"policyDocument": self.document,
"policyVersionId": self.version_id,
"isDefaultVersion": self.is_default
}
def to_dict(self):
return {
'versionId': self.version_id,
'isDefaultVersion': self.is_default,
'createDate': self.create_datetime,
"versionId": self.version_id,
"isDefaultVersion": self.is_default,
"createDate": self.create_datetime,
}
@ -277,7 +277,7 @@ class FakeJob(BaseModel):
self.presigned_url_config = presigned_url_config
self.target_selection = target_selection
self.job_executions_rollout_config = job_executions_rollout_config
self.status = 'QUEUED' # IN_PROGRESS | CANCELED | COMPLETED
self.status = "QUEUED" # IN_PROGRESS | CANCELED | COMPLETED
self.comment = None
self.reason_code = None
self.created_at = time.mktime(datetime(2015, 1, 1).timetuple())
@ -297,24 +297,24 @@ class FakeJob(BaseModel):
def to_dict(self):
obj = {
'jobArn': self.job_arn,
'jobId': self.job_id,
'targets': self.targets,
'description': self.description,
'presignedUrlConfig': self.presigned_url_config,
'targetSelection': self.target_selection,
'jobExecutionsRolloutConfig': self.job_executions_rollout_config,
'status': self.status,
'comment': self.comment,
'forceCanceled': self.force,
'reasonCode': self.reason_code,
'createdAt': self.created_at,
'lastUpdatedAt': self.last_updated_at,
'completedAt': self.completed_at,
'jobProcessDetails': self.job_process_details,
'documentParameters': self.document_parameters,
'document': self.document,
'documentSource': self.document_source
"jobArn": self.job_arn,
"jobId": self.job_id,
"targets": self.targets,
"description": self.description,
"presignedUrlConfig": self.presigned_url_config,
"targetSelection": self.target_selection,
"jobExecutionsRolloutConfig": self.job_executions_rollout_config,
"status": self.status,
"comment": self.comment,
"forceCanceled": self.force,
"reasonCode": self.reason_code,
"createdAt": self.created_at,
"lastUpdatedAt": self.last_updated_at,
"completedAt": self.completed_at,
"jobProcessDetails": self.job_process_details,
"documentParameters": self.document_parameters,
"document": self.document,
"documentSource": self.document_source
}
return obj
@ -327,7 +327,7 @@ class FakeJob(BaseModel):
class FakeJobExecution(BaseModel):
def __init__(self, job_id, thing_arn, status='QUEUED', force_canceled=False, status_details_map={}):
def __init__(self, job_id, thing_arn, status="QUEUED", force_canceled=False, status_details_map={}):
self.job_id = job_id
self.status = status # IN_PROGRESS | CANCELED | COMPLETED
self.force_canceled = force_canceled
@ -342,31 +342,31 @@ class FakeJobExecution(BaseModel):
def to_get_dict(self):
obj = {
'jobId': self.job_id,
'status': self.status,
'forceCanceled': self.force_canceled,
'statusDetails': {'detailsMap': self.status_details_map},
'thingArn': self.thing_arn,
'queuedAt': self.queued_at,
'startedAt': self.started_at,
'lastUpdatedAt': self.last_updated_at,
'executionNumber': self.execution_number,
'versionNumber': self.version_number,
'approximateSecondsBeforeTimedOut': self.approximate_seconds_before_time_out
"jobId": self.job_id,
"status": self.status,
"forceCanceled": self.force_canceled,
"statusDetails": {"detailsMap": self.status_details_map},
"thingArn": self.thing_arn,
"queuedAt": self.queued_at,
"startedAt": self.started_at,
"lastUpdatedAt": self.last_updated_at,
"executionNumber": self.execution_number,
"versionNumber": self.version_number,
"approximateSecondsBeforeTimedOut": self.approximate_seconds_before_time_out
}
return obj
def to_dict(self):
obj = {
'jobId': self.job_id,
'thingArn': self.thing_arn,
'jobExecutionSummary': {
'status': self.status,
'queuedAt': self.queued_at,
'startedAt': self.started_at,
'lastUpdatedAt': self.last_updated_at,
'executionNumber': self.execution_number,
"jobId": self.job_id,
"thingArn": self.thing_arn,
"jobExecutionSummary": {
"status": self.status,
"queuedAt": self.queued_at,
"startedAt": self.started_at,
"lastUpdatedAt": self.last_updated_at,
"executionNumber": self.execution_number,
}
}
@ -423,7 +423,7 @@ class IoTBackend(BaseBackend):
def list_thing_types(self, thing_type_name=None):
if thing_type_name:
# It's weird but thing_type_name is filtered by forward match, not complete match
# It"s weird but thing_type_name is filtered by forward match, not complete match
return [
_
for _ in self.thing_types.values()
@ -686,7 +686,7 @@ class IoTBackend(BaseBackend):
raise ResourceNotFoundException()
version = FakePolicyVersion(policy_name, policy_document, set_as_default, self.region_name)
policy.versions.append(version)
version.version_id = '{0}'.format(len(policy.versions))
version.version_id = "{0}".format(len(policy.versions))
if set_as_default:
self.set_default_policy_version(policy_name, version.version_id)
return version
@ -976,7 +976,7 @@ class IoTBackend(BaseBackend):
self.jobs[job_id] = job
for thing_arn in targets:
thing_name = thing_arn.split(':')[-1].split('/')[-1]
thing_name = thing_arn.split(":")[-1].split("/")[-1]
job_execution = FakeJobExecution(job_id, thing_arn)
self.job_executions[(job_id, thing_name)] = job_execution
return job.job_arn, job_id, description
@ -990,9 +990,9 @@ class IoTBackend(BaseBackend):
def delete_job(self, job_id, force):
job = self.jobs[job_id]
if job.status == 'IN_PROGRESS' and force:
if job.status == "IN_PROGRESS" and force:
del self.jobs[job_id]
elif job.status != 'IN_PROGRESS':
elif job.status != "IN_PROGRESS":
del self.jobs[job_id]
else:
raise InvalidStateTransitionException()
@ -1003,11 +1003,11 @@ class IoTBackend(BaseBackend):
job.reason_code = reason_code if reason_code is not None else job.reason_code
job.comment = comment if comment is not None else job.comment
job.force = force if force is not None and force != job.force else job.force
job.status = 'CANCELED'
job.status = "CANCELED"
if job.status == 'IN_PROGRESS' and force:
if job.status == "IN_PROGRESS" and force:
self.jobs[job_id] = job
elif job.status != 'IN_PROGRESS':
elif job.status != "IN_PROGRESS":
self.jobs[job_id] = job
else:
raise InvalidStateTransitionException()
@ -1053,11 +1053,11 @@ class IoTBackend(BaseBackend):
job_execution.force_canceled = force if force is not None else job_execution.force_canceled
# TODO: implement expected_version and status_details (at most 10 can be specified)
if job_execution.status == 'IN_PROGRESS' and force:
job_execution.status = 'CANCELED'
if job_execution.status == "IN_PROGRESS" and force:
job_execution.status = "CANCELED"
self.job_executions[(job_id, thing_name)] = job_execution
elif job_execution.status != 'IN_PROGRESS':
job_execution.status = 'CANCELED'
elif job_execution.status != "IN_PROGRESS":
job_execution.status = "CANCELED"
self.job_executions[(job_id, thing_name)] = job_execution
else:
raise InvalidStateTransitionException()
@ -1068,9 +1068,9 @@ class IoTBackend(BaseBackend):
if job_execution.execution_number != execution_number:
raise ResourceNotFoundException()
if job_execution.status == 'IN_PROGRESS' and force:
if job_execution.status == "IN_PROGRESS" and force:
del self.job_executions[(job_id, thing_name)]
elif job_execution.status != 'IN_PROGRESS':
elif job_execution.status != "IN_PROGRESS":
del self.job_executions[(job_id, thing_name)]
else:
raise InvalidStateTransitionException()
@ -1080,8 +1080,7 @@ class IoTBackend(BaseBackend):
if status is not None:
job_executions = list(filter(lambda elem:
status in elem["status"] and
elem["status"] == status, job_executions))
status in elem["status"] and elem["status"] == status, job_executions))
token = next_token
if token is None:
@ -1099,8 +1098,7 @@ class IoTBackend(BaseBackend):
if status is not None:
job_executions = list(filter(lambda elem:
status in elem["status"] and
elem["status"] == status, job_executions))
status in elem["status"] and elem["status"] == status, job_executions))
token = next_token
if token is None: