adding more job mocks for IoT service

This commit is contained in:
Stephan 2019-01-09 16:18:22 +01:00
parent cfd12b6d19
commit 82f476bb46
3 changed files with 138 additions and 22 deletions

View File

@ -226,12 +226,14 @@ class FakeJob(BaseModel):
self.targets = targets
self.document_source = document_source
self.document = document
self.force = False
self.description = description
self.presigned_url_config = presigned_url_config
self.target_selection = target_selection
self.job_executions_rollout_config = job_executions_rollout_config
self.status = None # IN_PROGRESS | CANCELED | COMPLETED
self.status = 'QUEUED' # IN_PROGRESS | CANCELED | COMPLETED
self.comment = None
self.reason_code = None
self.created_at = time.mktime(datetime(2015, 1, 1).timetuple())
self.last_updated_at = time.mktime(datetime(2015, 1, 1).timetuple())
self.completed_at = None
@ -258,6 +260,8 @@ class FakeJob(BaseModel):
'jobExecutionsRolloutConfig': self.job_executions_rollout_config,
'status': self.status,
'comment': self.comment,
'forceCanceled': self.force,
'reasonCode': self.reason_code,
'createdAt': self.created_at,
'lastUpdatedAt': self.last_updated_at,
'completedAt': self.completed_at,
@ -778,7 +782,7 @@ class IoTBackend(BaseBackend):
thing_name = thing_arn.split(':')[-1].split('/')[-1]
job_execution = FakeJobExecution(job_id, thing_arn)
self.job_executions[(job_id, thing_name)] = job_execution
return job.job_arn, job_id, description
return job
def describe_job(self, job_id):
jobs = [_ for _ in self.jobs.values() if _.job_id == job_id]
@ -786,6 +790,33 @@ class IoTBackend(BaseBackend):
raise ResourceNotFoundException()
return jobs[0]
def delete_job(self, job_id, force):
job = self.jobs[job_id]
if job.status == 'IN_PROGRESS' and force:
del self.jobs[job_id]
elif job.status != 'IN_PROGRESS':
del self.jobs[job_id]
else:
raise InvalidStateTransitionException()
def cancel_job(self, job_id, reason_code, comment, force):
job = self.jobs[job_id]
job.reason_code = reason_code if reason_code is not None else job.reason_code
job.comment = comment if comment is not None else job.comment
job.force = force if force is not None and force != job.force else job.force
job.status = 'CANCELED'
if job.status == 'IN_PROGRESS' and force:
self.jobs[job_id] = job
elif job.status != 'IN_PROGRESS':
self.jobs[job_id] = job
else:
raise InvalidStateTransitionException()
return job
def get_job_document(self, job_id):
return self.jobs[job_id]

View File

@ -115,7 +115,7 @@ class IoTResponse(BaseResponse):
return json.dumps(dict())
def create_job(self):
job_arn, job_id, description = self.iot_backend.create_job(
job = self.iot_backend.create_job(
job_id=self._get_param("jobId"),
targets=self._get_param("targets"),
description=self._get_param("description"),
@ -127,28 +127,33 @@ class IoTResponse(BaseResponse):
document_parameters=self._get_param("documentParameters")
)
return json.dumps(dict(jobArn=job_arn, jobId=job_id, description=description))
return json.dumps(job.to_dict())
def describe_job(self):
job = self.iot_backend.describe_job(job_id=self._get_param("jobId"))
return json.dumps(dict(
documentSource=job.document_source,
job=dict(
comment=job.comment,
completedAt=job.completed_at,
createdAt=job.created_at,
description=job.description,
documentParameters=job.document_parameters,
jobArn=job.job_arn,
jobExecutionsRolloutConfig=job.job_executions_rollout_config,
jobId=job.job_id,
jobProcessDetails=job.job_process_details,
lastUpdatedAt=job.last_updated_at,
presignedUrlConfig=job.presigned_url_config,
status=job.status,
targets=job.targets,
targetSelection=job.target_selection
)))
return json.dumps(dict(documentSource=job.document_source, job=job.to_dict()))
def delete_job(self):
job_id = self._get_param("jobId")
force = self._get_bool_param("force")
self.iot_backend.delete_job(job_id=job_id,
force=force)
return json.dumps(dict())
def cancel_job(self):
job_id = self._get_param("jobId")
reason_code = self._get_param("reasonCode")
comment = self._get_param("comment")
force = self._get_bool_param("force")
job = self.iot_backend.cancel_job(job_id=job_id,
reason_code=reason_code,
comment=comment,
force=force)
return json.dumps(job.to_dict())
def get_job_document(self):
job = self.iot_backend.get_job_document(job_id=self._get_param("jobId"))

View File

@ -872,6 +872,86 @@ def test_describe_job_1():
"maximumPerMinute").which.should.equal(10)
@mock_iot
def test_delete_job():
client = boto3.client('iot', region_name='eu-west-1')
name = "my-thing"
job_id = "TestJob"
# thing
thing = client.create_thing(thingName=name)
thing.should.have.key('thingName').which.should.equal(name)
thing.should.have.key('thingArn')
job = client.create_job(
jobId=job_id,
targets=[thing["thingArn"]],
documentSource="https://s3-eu-west-1.amazonaws.com/bucket-name/job_document.json",
presignedUrlConfig={
'roleArn': 'arn:aws:iam::1:role/service-role/iot_job_role',
'expiresInSec': 123
},
targetSelection="CONTINUOUS",
jobExecutionsRolloutConfig={
'maximumPerMinute': 10
}
)
job.should.have.key('jobId').which.should.equal(job_id)
job.should.have.key('jobArn')
job = client.describe_job(jobId=job_id)
job.should.have.key('job')
job.should.have.key('job').which.should.have.key("jobId").which.should.equal(job_id)
client.delete_job(jobId=job_id)
client.list_jobs()['jobs'].should.have.length_of(0)
@mock_iot
def test_cancel_job():
client = boto3.client('iot', region_name='eu-west-1')
name = "my-thing"
job_id = "TestJob"
# thing
thing = client.create_thing(thingName=name)
thing.should.have.key('thingName').which.should.equal(name)
thing.should.have.key('thingArn')
job = client.create_job(
jobId=job_id,
targets=[thing["thingArn"]],
documentSource="https://s3-eu-west-1.amazonaws.com/bucket-name/job_document.json",
presignedUrlConfig={
'roleArn': 'arn:aws:iam::1:role/service-role/iot_job_role',
'expiresInSec': 123
},
targetSelection="CONTINUOUS",
jobExecutionsRolloutConfig={
'maximumPerMinute': 10
}
)
job.should.have.key('jobId').which.should.equal(job_id)
job.should.have.key('jobArn')
job = client.describe_job(jobId=job_id)
job.should.have.key('job')
job.should.have.key('job').which.should.have.key("jobId").which.should.equal(job_id)
job = client.cancel_job(jobId=job_id, reasonCode='Because', comment='You are')
job.should.have.key('jobId').which.should.equal(job_id)
job.should.have.key('jobArn')
job = client.describe_job(jobId=job_id)
job.should.have.key('job')
job.should.have.key('job').which.should.have.key("jobId").which.should.equal(job_id)
job.should.have.key('job').which.should.have.key("status").which.should.equal('CANCELED')
job.should.have.key('job').which.should.have.key("forceCanceled").which.should.equal(False)
job.should.have.key('job').which.should.have.key("reasonCode").which.should.equal('Because')
job.should.have.key('job').which.should.have.key("comment").which.should.equal('You are')
@mock_iot
def test_get_job_document_with_document_source():
client = boto3.client('iot', region_name='eu-west-1')