258 lines
10 KiB
Python
258 lines
10 KiB
Python
|
import boto3
|
||
|
import pytest
|
||
|
|
||
|
from botocore.exceptions import ClientError
|
||
|
from moto import mock_iot
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_certificate_id_generation_deterministic():
|
||
|
# Creating the same certificate twice should result in the same certificate ID
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert1 = client.create_keys_and_certificate(setAsActive=False)
|
||
|
client.delete_certificate(certificateId=cert1["certificateId"])
|
||
|
|
||
|
cert2 = client.register_certificate(
|
||
|
certificatePem=cert1["certificatePem"], setAsActive=False
|
||
|
)
|
||
|
cert2.should.have.key("certificateId").which.should.equal(cert1["certificateId"])
|
||
|
client.delete_certificate(certificateId=cert2["certificateId"])
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_create_key_and_certificate():
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert.should.have.key("certificateArn").which.should_not.be.none
|
||
|
cert.should.have.key("certificateId").which.should_not.be.none
|
||
|
cert.should.have.key("certificatePem").which.should_not.be.none
|
||
|
cert.should.have.key("keyPair")
|
||
|
cert["keyPair"].should.have.key("PublicKey").which.should_not.be.none
|
||
|
cert["keyPair"].should.have.key("PrivateKey").which.should_not.be.none
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_describe_certificate_by_id():
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert_id = cert["certificateId"]
|
||
|
|
||
|
cert = client.describe_certificate(certificateId=cert_id)
|
||
|
cert.should.have.key("certificateDescription")
|
||
|
cert_desc = cert["certificateDescription"]
|
||
|
cert_desc.should.have.key("certificateArn").which.should_not.be.none
|
||
|
cert_desc.should.have.key("certificateId").which.should_not.be.none
|
||
|
cert_desc.should.have.key("certificatePem").which.should_not.be.none
|
||
|
cert_desc.should.have.key("validity").which.should_not.be.none
|
||
|
validity = cert_desc["validity"]
|
||
|
validity.should.have.key("notBefore").which.should_not.be.none
|
||
|
validity.should.have.key("notAfter").which.should_not.be.none
|
||
|
cert_desc.should.have.key("status").which.should.equal("ACTIVE")
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_list_certificates():
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert_id = cert["certificateId"]
|
||
|
|
||
|
res = client.list_certificates()
|
||
|
for cert in res["certificates"]:
|
||
|
cert.should.have.key("certificateArn").which.should_not.be.none
|
||
|
cert.should.have.key("certificateId").which.should_not.be.none
|
||
|
cert.should.have.key("status").which.should_not.be.none
|
||
|
cert.should.have.key("creationDate").which.should_not.be.none
|
||
|
|
||
|
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
|
||
|
cert = client.describe_certificate(certificateId=cert_id)
|
||
|
cert_desc = cert["certificateDescription"]
|
||
|
cert_desc.should.have.key("status").which.should.equal("REVOKED")
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_update_certificate():
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert_id = cert["certificateId"]
|
||
|
|
||
|
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
|
||
|
cert = client.describe_certificate(certificateId=cert_id)
|
||
|
cert_desc = cert["certificateDescription"]
|
||
|
cert_desc.should.have.key("status").which.should.equal("REVOKED")
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("status", ["REVOKED", "INACTIVE"])
|
||
|
@mock_iot
|
||
|
def test_delete_certificate_with_status(status):
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert_id = cert["certificateId"]
|
||
|
|
||
|
# Ensure certificate has the right status before we can delete
|
||
|
client.update_certificate(certificateId=cert_id, newStatus=status)
|
||
|
|
||
|
client.delete_certificate(certificateId=cert_id)
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates").equals([])
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_register_certificate_without_ca():
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert_id = cert["certificateId"]
|
||
|
cert_pem = cert["certificatePem"]
|
||
|
|
||
|
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
|
||
|
client.delete_certificate(certificateId=cert_id)
|
||
|
|
||
|
# Test register_certificate without CA flow
|
||
|
cert = client.register_certificate_without_ca(
|
||
|
certificatePem=cert_pem, status="INACTIVE"
|
||
|
)
|
||
|
cert.should.have.key("certificateId").which.should_not.be.none
|
||
|
cert.should.have.key("certificateArn").which.should_not.be.none
|
||
|
cert_id = cert["certificateId"]
|
||
|
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates").which.should.have.length_of(1)
|
||
|
for cert in res["certificates"]:
|
||
|
cert.should.have.key("certificateArn").which.should_not.be.none
|
||
|
cert.should.have.key("certificateId").which.should_not.be.none
|
||
|
cert.should.have.key("status").which.should_not.be.none
|
||
|
cert.should.have.key("creationDate").which.should_not.be.none
|
||
|
|
||
|
client.delete_certificate(certificateId=cert_id)
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates")
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_create_certificate_validation():
|
||
|
# Test we can't create a cert that already exists
|
||
|
client = boto3.client("iot", region_name="us-east-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=False)
|
||
|
|
||
|
with pytest.raises(ClientError) as e:
|
||
|
client.register_certificate(
|
||
|
certificatePem=cert["certificatePem"], setAsActive=False
|
||
|
)
|
||
|
e.value.response["Error"]["Message"].should.contain(
|
||
|
"The certificate is already provisioned or registered"
|
||
|
)
|
||
|
|
||
|
with pytest.raises(ClientError) as e:
|
||
|
client.register_certificate_without_ca(
|
||
|
certificatePem=cert["certificatePem"], status="ACTIVE"
|
||
|
)
|
||
|
e.value.response["Error"]["Message"].should.contain(
|
||
|
"The certificate is already provisioned or registered"
|
||
|
)
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_delete_certificate_validation():
|
||
|
doc = """{
|
||
|
"Version": "2012-10-17",
|
||
|
"Statement":[
|
||
|
{
|
||
|
"Effect":"Allow",
|
||
|
"Action":[
|
||
|
"iot: *"
|
||
|
],
|
||
|
"Resource":"*"
|
||
|
}
|
||
|
]
|
||
|
}
|
||
|
"""
|
||
|
client = boto3.client("iot", region_name="ap-northeast-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert_id = cert["certificateId"]
|
||
|
cert_arn = cert["certificateArn"]
|
||
|
policy_name = "my-policy"
|
||
|
thing_name = "thing-1"
|
||
|
client.create_policy(policyName=policy_name, policyDocument=doc)
|
||
|
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||
|
client.create_thing(thingName=thing_name)
|
||
|
client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||
|
|
||
|
with pytest.raises(ClientError) as e:
|
||
|
client.delete_certificate(certificateId=cert_id)
|
||
|
e.value.response["Error"]["Message"].should.contain(
|
||
|
"Certificate must be deactivated (not ACTIVE) before deletion."
|
||
|
)
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates").which.should.have.length_of(1)
|
||
|
|
||
|
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
|
||
|
with pytest.raises(ClientError) as e:
|
||
|
client.delete_certificate(certificateId=cert_id)
|
||
|
e.value.response["Error"]["Message"].should.contain(
|
||
|
"Things must be detached before deletion (arn: %s)" % cert_arn
|
||
|
)
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates").which.should.have.length_of(1)
|
||
|
|
||
|
client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||
|
with pytest.raises(ClientError) as e:
|
||
|
client.delete_certificate(certificateId=cert_id)
|
||
|
e.value.response["Error"]["Message"].should.contain(
|
||
|
"Certificate policies must be detached before deletion (arn: %s)" % cert_arn
|
||
|
)
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates").which.should.have.length_of(1)
|
||
|
|
||
|
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
|
||
|
client.delete_certificate(certificateId=cert_id)
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates").which.should.have.length_of(0)
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_delete_thing_with_certificate_validation():
|
||
|
client = boto3.client("iot", region_name="ap-northeast-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=True)
|
||
|
cert_id = cert["certificateId"]
|
||
|
cert_arn = cert["certificateArn"]
|
||
|
thing_name = "thing-1"
|
||
|
client.create_thing(thingName=thing_name)
|
||
|
client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||
|
|
||
|
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
|
||
|
|
||
|
with pytest.raises(ClientError) as e:
|
||
|
client.delete_thing(thingName=thing_name)
|
||
|
err = e.value.response["Error"]
|
||
|
err["Code"].should.equal("InvalidRequestException")
|
||
|
err["Message"].should.equals(
|
||
|
f"Cannot delete. Thing {thing_name} is still attached to one or more principals"
|
||
|
)
|
||
|
|
||
|
client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
|
||
|
|
||
|
client.delete_thing(thingName=thing_name)
|
||
|
|
||
|
# Certificate still exists
|
||
|
res = client.list_certificates()
|
||
|
res.should.have.key("certificates").which.should.have.length_of(1)
|
||
|
res["certificates"][0]["certificateArn"].should.equal(cert_arn)
|
||
|
res["certificates"][0]["status"].should.equal("REVOKED")
|
||
|
|
||
|
|
||
|
@mock_iot
|
||
|
def test_certs_create_inactive():
|
||
|
client = boto3.client("iot", region_name="ap-northeast-1")
|
||
|
cert = client.create_keys_and_certificate(setAsActive=False)
|
||
|
cert_id = cert["certificateId"]
|
||
|
|
||
|
cert = client.describe_certificate(certificateId=cert_id)
|
||
|
cert.should.have.key("certificateDescription")
|
||
|
cert_desc = cert["certificateDescription"]
|
||
|
cert_desc.should.have.key("status").which.should.equal("INACTIVE")
|
||
|
|
||
|
client.update_certificate(certificateId=cert_id, newStatus="ACTIVE")
|
||
|
cert = client.describe_certificate(certificateId=cert_id)
|
||
|
cert.should.have.key("certificateDescription")
|
||
|
cert_desc = cert["certificateDescription"]
|
||
|
cert_desc.should.have.key("status").which.should.equal("ACTIVE")
|