IOT: delete_certificate() now supports the forceDelete-parameter (#6085)

This commit is contained in:
Bert Blommers 2023-03-18 09:36:26 -01:00 committed by GitHub
parent 851f0c1181
commit 8bf55cbe0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 5 deletions

View File

@ -857,12 +857,12 @@ class IoTBackend(BaseBackend):
self._validation_delete(cert)
del self.ca_certificates[certificate_id]
def delete_certificate(self, certificate_id):
def delete_certificate(self, certificate_id, force_delete):
cert = self.describe_certificate(certificate_id)
self._validation_delete(cert)
self._validation_delete(cert, force_delete)
del self.certificates[certificate_id]
def _validation_delete(self, cert):
def _validation_delete(self, cert, force_delete: bool = False):
if cert.status == "ACTIVE":
raise CertificateStateException(
"Certificate must be deactivated (not ACTIVE) before deletion.",
@ -884,7 +884,7 @@ class IoTBackend(BaseBackend):
for k, v in self.principal_policies.items()
if self._get_principal(k[0]).certificate_id == cert.certificate_id
]
if len(certs) > 0:
if len(certs) > 0 and not force_delete:
raise DeleteConflictException(
"Certificate policies must be detached before deletion (arn: %s)"
% certs[0]

View File

@ -304,7 +304,8 @@ class IoTResponse(BaseResponse):
def delete_certificate(self):
certificate_id = self._get_param("certificateId")
self.iot_backend.delete_certificate(certificate_id=certificate_id)
force_delete = self._get_bool_param("forceDelete", False)
self.iot_backend.delete_certificate(certificate_id, force_delete)
return json.dumps(dict())
def describe_ca_certificate(self):

View File

@ -232,6 +232,31 @@ def test_delete_certificate_validation():
res.should.have.key("certificates").which.should.have.length_of(0)
@mock_iot
def test_delete_certificate_force():
policy_name = "my-policy"
client = boto3.client("iot", "us-east-1")
client.create_policy(policyName=policy_name, policyDocument="doc")
cert_arn = client.create_keys_and_certificate(setAsActive=True)["certificateArn"]
cert_id = cert_arn.split("/")[-1]
client.attach_policy(policyName=policy_name, target=cert_arn)
# Forced deletion does not work if the status is ACTIVE
with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id, forceDelete=True)
err = e.value.response["Error"]
err["Message"].should.contain("Certificate must be deactivated")
client.update_certificate(certificateId=cert_id, newStatus="INACTIVE")
# If does work if the status is INACTIVE, even though we still have policies attached
client.delete_certificate(certificateId=cert_id, forceDelete=True)
res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(0)
@mock_iot
def test_delete_thing_with_certificate_validation():
client = boto3.client("iot", region_name="ap-northeast-1")