From 8bf55cbe0e9000bc9ec08281f8442a2cffa522ad Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 18 Mar 2023 09:36:26 -0100 Subject: [PATCH] IOT: delete_certificate() now supports the forceDelete-parameter (#6085) --- moto/iot/models.py | 8 ++++---- moto/iot/responses.py | 3 ++- tests/test_iot/test_iot_certificates.py | 25 +++++++++++++++++++++++++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/moto/iot/models.py b/moto/iot/models.py index f6175ed69..68656778b 100644 --- a/moto/iot/models.py +++ b/moto/iot/models.py @@ -857,12 +857,12 @@ class IoTBackend(BaseBackend): self._validation_delete(cert) del self.ca_certificates[certificate_id] - def delete_certificate(self, certificate_id): + def delete_certificate(self, certificate_id, force_delete): cert = self.describe_certificate(certificate_id) - self._validation_delete(cert) + self._validation_delete(cert, force_delete) del self.certificates[certificate_id] - def _validation_delete(self, cert): + def _validation_delete(self, cert, force_delete: bool = False): if cert.status == "ACTIVE": raise CertificateStateException( "Certificate must be deactivated (not ACTIVE) before deletion.", @@ -884,7 +884,7 @@ class IoTBackend(BaseBackend): for k, v in self.principal_policies.items() if self._get_principal(k[0]).certificate_id == cert.certificate_id ] - if len(certs) > 0: + if len(certs) > 0 and not force_delete: raise DeleteConflictException( "Certificate policies must be detached before deletion (arn: %s)" % certs[0] diff --git a/moto/iot/responses.py b/moto/iot/responses.py index d9cf2b08d..5138677cc 100644 --- a/moto/iot/responses.py +++ b/moto/iot/responses.py @@ -304,7 +304,8 @@ class IoTResponse(BaseResponse): def delete_certificate(self): certificate_id = self._get_param("certificateId") - self.iot_backend.delete_certificate(certificate_id=certificate_id) + force_delete = self._get_bool_param("forceDelete", False) + self.iot_backend.delete_certificate(certificate_id, force_delete) return json.dumps(dict()) def describe_ca_certificate(self): diff --git a/tests/test_iot/test_iot_certificates.py b/tests/test_iot/test_iot_certificates.py index 729cddbe2..aafc40bfe 100644 --- a/tests/test_iot/test_iot_certificates.py +++ b/tests/test_iot/test_iot_certificates.py @@ -232,6 +232,31 @@ def test_delete_certificate_validation(): res.should.have.key("certificates").which.should.have.length_of(0) +@mock_iot +def test_delete_certificate_force(): + policy_name = "my-policy" + client = boto3.client("iot", "us-east-1") + client.create_policy(policyName=policy_name, policyDocument="doc") + + cert_arn = client.create_keys_and_certificate(setAsActive=True)["certificateArn"] + cert_id = cert_arn.split("/")[-1] + + client.attach_policy(policyName=policy_name, target=cert_arn) + + # Forced deletion does not work if the status is ACTIVE + with pytest.raises(ClientError) as e: + client.delete_certificate(certificateId=cert_id, forceDelete=True) + err = e.value.response["Error"] + err["Message"].should.contain("Certificate must be deactivated") + + client.update_certificate(certificateId=cert_id, newStatus="INACTIVE") + # If does work if the status is INACTIVE, even though we still have policies attached + client.delete_certificate(certificateId=cert_id, forceDelete=True) + + res = client.list_certificates() + res.should.have.key("certificates").which.should.have.length_of(0) + + @mock_iot def test_delete_thing_with_certificate_validation(): client = boto3.client("iot", region_name="ap-northeast-1")