Make IoT certificate ID generation deterministic and prevent duplicate certificates from being created (#3331)

* Make IoT certificate id generation deterministic

Fixes #3321

As per https://stackoverflow.com/questions/55847788/how-does-aws-iot-generate-a-certificate-id,
the IoT certificate ID is the SHA256 fingerprint of the
certificate. Since moto doesn't generate full certificates we will
instead use the SHA256 hash of the passed certificate pem.

* Don't allow duplicate IoT certificates to be created

Fixes #3320

When using boto3, trying to register a certificate that already
exists will throw a ResourceAlreadyExistsException. Moto should
follow the same pattern to allow testing error handling code in
this area.
This commit is contained in:
Ben Dennerley 2020-09-22 05:28:12 -04:00 committed by GitHub
parent 672a4b9a27
commit 958e95cf5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 2 deletions

View File

@ -52,3 +52,11 @@ class DeleteConflictException(IoTClientError):
def __init__(self, msg):
self.code = 409
super(DeleteConflictException, self).__init__("DeleteConflictException", msg)
class ResourceAlreadyExistsException(IoTClientError):
def __init__(self, msg):
self.code = 409
super(ResourceAlreadyExistsException, self).__init__(
"ResourceAlreadyExistsException", msg or "The resource already exists."
)

View File

@ -19,6 +19,7 @@ from .exceptions import (
InvalidRequestException,
InvalidStateTransitionException,
VersionConflictException,
ResourceAlreadyExistsException,
)
from moto.utilities.utils import random_string
@ -130,7 +131,7 @@ class FakeThingGroup(BaseModel):
class FakeCertificate(BaseModel):
def __init__(self, certificate_pem, status, region_name, ca_certificate_pem=None):
m = hashlib.sha256()
m.update(str(uuid.uuid4()).encode("utf-8"))
m.update(certificate_pem.encode("utf-8"))
self.certificate_id = m.hexdigest()
self.arn = "arn:aws:iot:%s:1:cert/%s" % (region_name, self.certificate_id)
self.certificate_pem = certificate_pem
@ -145,7 +146,7 @@ class FakeCertificate(BaseModel):
self.ca_certificate_id = None
self.ca_certificate_pem = ca_certificate_pem
if ca_certificate_pem:
m.update(str(uuid.uuid4()).encode("utf-8"))
m.update(ca_certificate_pem.encode("utf-8"))
self.ca_certificate_id = m.hexdigest()
def to_dict(self):
@ -668,6 +669,12 @@ class IoTBackend(BaseBackend):
def list_certificates(self):
return self.certificates.values()
def __raise_if_certificate_already_exists(self, certificate_id):
if certificate_id in self.certificates:
raise ResourceAlreadyExistsException(
"The certificate is already provisioned or registered"
)
def register_certificate(
self, certificate_pem, ca_certificate_pem, set_as_active, status
):
@ -677,11 +684,15 @@ class IoTBackend(BaseBackend):
self.region_name,
ca_certificate_pem,
)
self.__raise_if_certificate_already_exists(certificate.certificate_id)
self.certificates[certificate.certificate_id] = certificate
return certificate
def register_certificate_without_ca(self, certificate_pem, status):
certificate = FakeCertificate(certificate_pem, status, self.region_name)
self.__raise_if_certificate_already_exists(certificate.certificate_id)
self.certificates[certificate.certificate_id] = certificate
return certificate

View File

@ -503,6 +503,20 @@ def test_endpoints():
raise Exception("Should have raised error")
@mock_iot
def test_certificate_id_generation_deterministic():
# Creating the same certificate twice should result in the same certificate ID
client = boto3.client("iot", region_name="us-east-1")
cert1 = client.create_keys_and_certificate(setAsActive=False)
client.delete_certificate(certificateId=cert1["certificateId"])
cert2 = client.register_certificate(
certificatePem=cert1["certificatePem"], setAsActive=False
)
cert2.should.have.key("certificateId").which.should.equal(cert1["certificateId"])
client.delete_certificate(certificateId=cert2["certificateId"])
@mock_iot
def test_certs():
client = boto3.client("iot", region_name="us-east-1")
@ -584,6 +598,29 @@ def test_certs():
res.should.have.key("certificates")
@mock_iot
def test_create_certificate_validation():
# Test we can't create a cert that already exists
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=False)
with assert_raises(ClientError) as e:
client.register_certificate(
certificatePem=cert["certificatePem"], setAsActive=False
)
e.exception.response["Error"]["Message"].should.contain(
"The certificate is already provisioned or registered"
)
with assert_raises(ClientError) as e:
client.register_certificate_without_ca(
certificatePem=cert["certificatePem"], status="ACTIVE"
)
e.exception.response["Error"]["Message"].should.contain(
"The certificate is already provisioned or registered"
)
@mock_iot
def test_delete_policy_validation():
doc = """{