moto/tests/test_iot/test_iot_certificates.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

303 lines
12 KiB
Python
Raw Normal View History

import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_iot
@mock_iot
def test_certificate_id_generation_deterministic():
# Creating the same certificate twice should result in the same certificate ID
client = boto3.client("iot", region_name="us-east-1")
cert1 = client.create_keys_and_certificate(setAsActive=False)
client.delete_certificate(certificateId=cert1["certificateId"])
cert2 = client.register_certificate(
certificatePem=cert1["certificatePem"], setAsActive=False
)
assert cert2["certificateId"] == cert1["certificateId"]
client.delete_certificate(certificateId=cert2["certificateId"])
2022-02-10 09:02:40 +00:00
@mock_iot
def test_create_certificate_from_csr():
csr = "-----BEGIN CERTIFICATE REQUEST-----\nMIICijCCAXICAQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3RhdGUx\nITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN\nAQEBBQADggEPADCCAQoCggEBAMSUg2mO7mYnhvYUB55K0/ay9WLLgPjOHnbduyCv\nN+udkJaZc+A65ux9LvVo33VHDTlV2Ms9H/42on902WtuS3BNuxdXfD068CpN2lb6\nbSAeuKc6Fdu4BIP2bFYKCyejqBoOmTEhYA8bOM1Wu/pRsq1PkAmcGkvw3mlRx45E\nB2LRicWcg3YEleEBGyLYohyeMu0pnlsc7zsu5T4bwrjetdbDPVbzgu0Mf/evU9hJ\nG/IisXNxQhzPh/DTQsKZSNddZ4bmkAQrRN1nmNXD6QoxBiVyjjgKGrPnX+hz4ugm\naaN9CsOO/cad1E3C0KiI0BQCjxRb80wOpI4utz4pEcY97sUCAwEAAaAAMA0GCSqG\nSIb3DQEBBQUAA4IBAQC64L4JHvwxdxmnXT9Lv12p5CGx99d7VOXQXy29b1yH9cJ+\nFaQ2TH377uOdorSCS4bK7eje9/HLsCNgqftR0EruwSNnukF695BWN8e/AJSZe0vA\n3J/llZ6G7MWuOIhCswsOxqNnM1htu3o6ujXVrgBMeMgQy2tfylWfI7SGR6UmtLYF\nZrPaqXdkpt47ROJNCm2Oht1B0J3QEOmbIp/2XMxrfknzwH6se/CjuliiXVPYxrtO\n5hbZcRqjhugb8FWtaLirqh3Q3+1UIJ+CW0ZczsblP7DNdqqt8YQZpWVIqR64mSXV\nAjq/cupsJST9fey8chcNSTt4nKxOGs3OgXu1ftgy\n-----END CERTIFICATE REQUEST-----\n"
client = boto3.client("iot", region_name="us-east-2")
resp = client.create_certificate_from_csr(certificateSigningRequest=csr)
assert "certificateArn" in resp
assert "certificateId" in resp
assert "certificatePem" in resp
2022-02-10 09:02:40 +00:00
# Can create certificate a second time
client.create_certificate_from_csr(certificateSigningRequest=csr)
assert len(client.list_certificates()["certificates"]) == 2
2022-02-10 09:02:40 +00:00
@mock_iot
def test_create_key_and_certificate():
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=True)
assert cert["certificateArn"] is not None
assert cert["certificateId"] is not None
assert cert["certificatePem"].startswith("-----BEGIN CERTIFICATE-----")
assert cert["keyPair"]["PublicKey"].startswith("-----BEGIN PUBLIC KEY-----")
assert cert["keyPair"]["PrivateKey"].startswith("-----BEGIN RSA PRIVATE KEY-----")
@mock_iot
def test_describe_certificate_by_id():
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert["certificateId"]
cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"]
assert cert_desc["certificateArn"] is not None
assert cert_desc["certificateId"] is not None
assert cert_desc["certificatePem"] is not None
assert cert_desc["validity"] is not None
validity = cert_desc["validity"]
assert validity["notBefore"] is not None
assert validity["notAfter"] is not None
assert cert_desc["status"] == "ACTIVE"
@mock_iot
def test_list_certificates():
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert["certificateId"]
res = client.list_certificates()
for cert in res["certificates"]:
assert cert["certificateArn"] is not None
assert cert["certificateId"] is not None
assert cert["status"] is not None
assert cert["creationDate"] is not None
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"]
assert cert_desc["status"] == "REVOKED"
@mock_iot
def test_update_certificate():
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert["certificateId"]
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"]
assert cert_desc["status"] == "REVOKED"
@pytest.mark.parametrize("status", ["REVOKED", "INACTIVE"])
@mock_iot
def test_delete_certificate_with_status(status):
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert["certificateId"]
# Ensure certificate has the right status before we can delete
client.update_certificate(certificateId=cert_id, newStatus=status)
client.delete_certificate(certificateId=cert_id)
res = client.list_certificates()
assert res["certificates"] == []
@mock_iot
def test_register_certificate_without_ca():
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert["certificateId"]
cert_pem = cert["certificatePem"]
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
client.delete_certificate(certificateId=cert_id)
# Test register_certificate without CA flow
cert = client.register_certificate_without_ca(
certificatePem=cert_pem, status="INACTIVE"
)
assert cert["certificateId"] is not None
assert cert["certificateArn"] is not None
cert_id = cert["certificateId"]
res = client.list_certificates()
assert len(res["certificates"]) == 1
for cert in res["certificates"]:
assert cert["certificateArn"] is not None
assert cert["certificateId"] is not None
assert cert["status"] is not None
assert cert["creationDate"] is not None
client.delete_certificate(certificateId=cert_id)
res = client.list_certificates()
assert "certificates" in res
@mock_iot
def test_create_certificate_validation():
# Test we can't create a cert that already exists
client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=False)
with pytest.raises(ClientError) as e:
client.register_certificate(
certificatePem=cert["certificatePem"], setAsActive=False
)
assert (
"The certificate is already provisioned or registered"
in e.value.response["Error"]["Message"]
)
with pytest.raises(ClientError) as e:
client.register_certificate_without_ca(
certificatePem=cert["certificatePem"], status="ACTIVE"
)
assert e.value.response["resourceArn"] == cert["certificateArn"]
assert e.value.response["resourceId"] == cert["certificateId"]
assert (
"The certificate is already provisioned or registered"
in e.value.response["Error"]["Message"]
)
@mock_iot
def test_delete_certificate_validation():
doc = """{
"Version": "2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Action":[
"iot: *"
],
"Resource":"*"
}
]
}
"""
client = boto3.client("iot", region_name="ap-northeast-1")
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert["certificateId"]
cert_arn = cert["certificateArn"]
policy_name = "my-policy"
thing_name = "thing-1"
client.create_policy(policyName=policy_name, policyDocument=doc)
client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
client.create_thing(thingName=thing_name)
client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id)
assert (
"Certificate must be deactivated (not ACTIVE) before deletion."
in e.value.response["Error"]["Message"]
)
res = client.list_certificates()
assert len(res["certificates"]) == 1
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id)
assert (
f"Things must be detached before deletion (arn: {cert_arn})"
in e.value.response["Error"]["Message"]
)
res = client.list_certificates()
assert len(res["certificates"]) == 1
client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id)
assert (
f"Certificate policies must be detached before deletion (arn: {cert_arn})"
in e.value.response["Error"]["Message"]
)
res = client.list_certificates()
assert len(res["certificates"]) == 1
client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
client.delete_certificate(certificateId=cert_id)
res = client.list_certificates()
assert len(res["certificates"]) == 0
@mock_iot
def test_delete_certificate_force():
policy_name = "my-policy"
client = boto3.client("iot", "us-east-1")
client.create_policy(policyName=policy_name, policyDocument="doc")
cert_arn = client.create_keys_and_certificate(setAsActive=True)["certificateArn"]
cert_id = cert_arn.split("/")[-1]
client.attach_policy(policyName=policy_name, target=cert_arn)
# Forced deletion does not work if the status is ACTIVE
with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id, forceDelete=True)
err = e.value.response["Error"]
assert "Certificate must be deactivated" in err["Message"]
client.update_certificate(certificateId=cert_id, newStatus="INACTIVE")
# If does work if the status is INACTIVE, even though we still have policies attached
client.delete_certificate(certificateId=cert_id, forceDelete=True)
res = client.list_certificates()
assert len(res["certificates"]) == 0
@mock_iot
def test_delete_thing_with_certificate_validation():
client = boto3.client("iot", region_name="ap-northeast-1")
cert = client.create_keys_and_certificate(setAsActive=True)
cert_id = cert["certificateId"]
cert_arn = cert["certificateArn"]
thing_name = "thing-1"
client.create_thing(thingName=thing_name)
client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
with pytest.raises(ClientError) as e:
client.delete_thing(thingName=thing_name)
err = e.value.response["Error"]
assert err["Code"] == "InvalidRequestException"
assert (
err["Message"]
== f"Cannot delete. Thing {thing_name} is still attached to one or more principals"
)
client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
client.delete_thing(thingName=thing_name)
# Certificate still exists
res = client.list_certificates()
assert len(res["certificates"]) == 1
assert res["certificates"][0]["certificateArn"] == cert_arn
assert res["certificates"][0]["status"] == "REVOKED"
@mock_iot
def test_certs_create_inactive():
client = boto3.client("iot", region_name="ap-northeast-1")
cert = client.create_keys_and_certificate(setAsActive=False)
cert_id = cert["certificateId"]
cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"]
assert cert_desc["status"] == "INACTIVE"
client.update_certificate(certificateId=cert_id, newStatus="ACTIVE")
cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"]
assert cert_desc["status"] == "ACTIVE"