Add support for IoT register_certificate (#1827)

This commit is contained in:
Adam Hodges 2019-05-25 13:30:23 -04:00 committed by Terry Cain
parent 4cce4defac
commit 71f83d7e4f
4 changed files with 56 additions and 6 deletions

View File

@ -2382,7 +2382,7 @@
- [ ] unsubscribe_from_event
- [ ] update_assessment_target
## iot - 32% implemented
## iot - 33% implemented
- [ ] accept_certificate_transfer
- [X] add_thing_to_thing_group
- [ ] associate_targets_with_job
@ -2480,7 +2480,7 @@
- [ ] list_topic_rules
- [ ] list_v2_logging_levels
- [ ] register_ca_certificate
- [ ] register_certificate
- [X] register_certificate
- [ ] register_thing
- [ ] reject_certificate_transfer
- [X] remove_thing_from_thing_group

View File

@ -96,7 +96,7 @@ class FakeThingGroup(BaseModel):
class FakeCertificate(BaseModel):
def __init__(self, certificate_pem, status, region_name):
def __init__(self, certificate_pem, status, region_name, ca_certificate_pem=None):
m = hashlib.sha256()
m.update(str(uuid.uuid4()).encode('utf-8'))
self.certificate_id = m.hexdigest()
@ -109,12 +109,18 @@ class FakeCertificate(BaseModel):
self.transfer_data = {}
self.creation_date = time.time()
self.last_modified_date = self.creation_date
self.ca_certificate_id = None
self.ca_certificate_pem = ca_certificate_pem
if ca_certificate_pem:
m.update(str(uuid.uuid4()).encode('utf-8'))
self.ca_certificate_id = m.hexdigest()
def to_dict(self):
return {
'certificateArn': self.arn,
'certificateId': self.certificate_id,
'caCertificateId': self.ca_certificate_id,
'status': self.status,
'creationDate': self.creation_date
}
@ -410,6 +416,12 @@ class IoTBackend(BaseBackend):
def list_certificates(self):
return self.certificates.values()
def register_certificate(self, certificate_pem, ca_certificate_pem, set_as_active, status):
certificate = FakeCertificate(certificate_pem, 'ACTIVE' if set_as_active else status,
self.region_name, ca_certificate_pem)
self.certificates[certificate.certificate_id] = certificate
return certificate
def update_certificate(self, certificate_id, new_status):
cert = self.describe_certificate(certificate_id)
# TODO: validate new_status

View File

@ -183,6 +183,20 @@ class IoTResponse(BaseResponse):
# TODO: implement pagination in the future
return json.dumps(dict(certificates=[_.to_dict() for _ in certificates]))
def register_certificate(self):
certificate_pem = self._get_param("certificatePem")
ca_certificate_pem = self._get_param("caCertificatePem")
set_as_active = self._get_bool_param("setAsActive")
status = self._get_param("status")
cert = self.iot_backend.register_certificate(
certificate_pem=certificate_pem,
ca_certificate_pem=ca_certificate_pem,
set_as_active=set_as_active,
status=status
)
return json.dumps(dict(certificateId=cert.certificate_id, certificateArn=cert.arn))
def update_certificate(self):
certificate_id = self._get_param("certificateId")
new_status = self._get_param("newStatus")

View File

@ -228,7 +228,7 @@ def test_list_things_with_attribute_and_thing_type_filter_and_next_token():
@mock_iot
def test_certs():
client = boto3.client('iot', region_name='ap-northeast-1')
client = boto3.client('iot', region_name='us-east-1')
cert = client.create_keys_and_certificate(setAsActive=True)
cert.should.have.key('certificateArn').which.should_not.be.none
cert.should.have.key('certificateId').which.should_not.be.none
@ -245,6 +245,29 @@ def test_certs():
cert_desc.should.have.key('certificateId').which.should_not.be.none
cert_desc.should.have.key('certificatePem').which.should_not.be.none
cert_desc.should.have.key('status').which.should.equal('ACTIVE')
cert_pem = cert_desc['certificatePem']
res = client.list_certificates()
for cert in res['certificates']:
cert.should.have.key('certificateArn').which.should_not.be.none
cert.should.have.key('certificateId').which.should_not.be.none
cert.should.have.key('status').which.should_not.be.none
cert.should.have.key('creationDate').which.should_not.be.none
client.update_certificate(certificateId=cert_id, newStatus='REVOKED')
cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert['certificateDescription']
cert_desc.should.have.key('status').which.should.equal('REVOKED')
client.delete_certificate(certificateId=cert_id)
res = client.list_certificates()
res.should.have.key('certificates')
# Test register_certificate flow
cert = client.register_certificate(certificatePem=cert_pem, setAsActive=True)
cert.should.have.key('certificateId').which.should_not.be.none
cert.should.have.key('certificateArn').which.should_not.be.none
cert_id = cert['certificateId']
res = client.list_certificates()
res.should.have.key('certificates').which.should.have.length_of(1)
@ -256,11 +279,12 @@ def test_certs():
client.update_certificate(certificateId=cert_id, newStatus='REVOKED')
cert = client.describe_certificate(certificateId=cert_id)
cert_desc.should.have.key('status').which.should.equal('ACTIVE')
cert_desc = cert['certificateDescription']
cert_desc.should.have.key('status').which.should.equal('REVOKED')
client.delete_certificate(certificateId=cert_id)
res = client.list_certificates()
res.should.have.key('certificates').which.should.have.length_of(0)
res.should.have.key('certificates')
@mock_iot