IoT: Generate valid keys and certificates (#5801)

This commit is contained in:
Viren Nadkarni 2022-12-22 22:19:14 +05:30 committed by GitHub
parent f5fbddec86
commit a53f620846
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 8 deletions

View File

@ -3,6 +3,7 @@ import re
import time import time
from collections import OrderedDict from collections import OrderedDict
from cryptography import x509 from cryptography import x509
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives import serialization, hashes
@ -598,13 +599,13 @@ class IoTBackend(BaseBackend):
pem, ca_certificate_pem=None, set_as_active=set_as_active, status="INACTIVE" pem, ca_certificate_pem=None, set_as_active=set_as_active, status="INACTIVE"
) )
def _generate_certificate_pem(self, domain_name, subject): def _generate_certificate_pem(self, domain_name, subject, key=None):
sans = set() sans = set()
sans.add(domain_name) sans.add(domain_name)
sans = [x509.DNSName(item) for item in sans] sans = [x509.DNSName(item) for item in sans]
key = rsa.generate_private_key( key = key or rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend() public_exponent=65537, key_size=2048, backend=default_backend()
) )
issuer = x509.Name( issuer = x509.Name(
@ -822,11 +823,28 @@ class IoTBackend(BaseBackend):
def create_keys_and_certificate(self, set_as_active): def create_keys_and_certificate(self, set_as_active):
# implement here # implement here
# caCertificate can be blank # caCertificate can be blank
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
key_pair = { key_pair = {
"PublicKey": random.get_random_string(), "PublicKey": private_key.public_key()
"PrivateKey": random.get_random_string(), .public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
.decode("utf-8"),
"PrivateKey": private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8"),
} }
certificate_pem = random.get_random_string() subject = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, "AWS IoT Certificate")]
)
certificate_pem = self._generate_certificate_pem(
"getmoto.org", subject, key=private_key
)
status = "ACTIVE" if set_as_active else "INACTIVE" status = "ACTIVE" if set_as_active else "INACTIVE"
certificate = FakeCertificate( certificate = FakeCertificate(
certificate_pem, status, self.account_id, self.region_name certificate_pem, status, self.account_id, self.region_name

View File

@ -41,10 +41,16 @@ def test_create_key_and_certificate():
cert = client.create_keys_and_certificate(setAsActive=True) cert = client.create_keys_and_certificate(setAsActive=True)
cert.should.have.key("certificateArn").which.should_not.be.none cert.should.have.key("certificateArn").which.should_not.be.none
cert.should.have.key("certificateId").which.should_not.be.none cert.should.have.key("certificateId").which.should_not.be.none
cert.should.have.key("certificatePem").which.should_not.be.none cert.should.have.key("certificatePem").which.should.match(
r"^-----BEGIN CERTIFICATE-----"
)
cert.should.have.key("keyPair") cert.should.have.key("keyPair")
cert["keyPair"].should.have.key("PublicKey").which.should_not.be.none cert["keyPair"].should.have.key("PublicKey").which.should.match(
cert["keyPair"].should.have.key("PrivateKey").which.should_not.be.none r"^-----BEGIN PUBLIC KEY-----"
)
cert["keyPair"].should.have.key("PrivateKey").which.should.match(
r"^-----BEGIN RSA PRIVATE KEY-----"
)
@mock_iot @mock_iot