Techdebt: Replace sure with regular assertions in IOT (#6577)

This commit is contained in:
Bert Blommers 2023-07-31 21:50:24 +00:00 committed by GitHub
parent 38b7ffade2
commit 95dfa04691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 767 additions and 1000 deletions

View File

@ -1,4 +1,3 @@
import sure # noqa # pylint: disable=unused-import
import boto3 import boto3
from moto import mock_iot from moto import mock_iot
@ -14,37 +13,26 @@ def test_endpoints():
# iot:Data # iot:Data
endpoint = client.describe_endpoint(endpointType="iot:Data") endpoint = client.describe_endpoint(endpointType="iot:Data")
endpoint.should.have.key("endpointAddress").which.should_not.contain("ats") assert "ats" not in endpoint["endpointAddress"]
endpoint.should.have.key("endpointAddress").which.should.contain( assert f"iot.{region_name}.amazonaws.com" in endpoint["endpointAddress"]
f"iot.{region_name}.amazonaws.com"
)
# iot:Data-ATS # iot:Data-ATS
endpoint = client.describe_endpoint(endpointType="iot:Data-ATS") endpoint = client.describe_endpoint(endpointType="iot:Data-ATS")
endpoint.should.have.key("endpointAddress").which.should.contain( assert f"ats.iot.{region_name}.amazonaws.com" in endpoint["endpointAddress"]
f"ats.iot.{region_name}.amazonaws.com"
)
# iot:Data-ATS # iot:Data-ATS
endpoint = client.describe_endpoint(endpointType="iot:CredentialProvider") endpoint = client.describe_endpoint(endpointType="iot:CredentialProvider")
endpoint.should.have.key("endpointAddress").which.should.contain( assert f"credentials.iot.{region_name}.amazonaws.com" in endpoint["endpointAddress"]
f"credentials.iot.{region_name}.amazonaws.com"
)
# iot:Data-ATS # iot:Data-ATS
endpoint = client.describe_endpoint(endpointType="iot:Jobs") endpoint = client.describe_endpoint(endpointType="iot:Jobs")
endpoint.should.have.key("endpointAddress").which.should.contain( assert f"jobs.iot.{region_name}.amazonaws.com" in endpoint["endpointAddress"]
f"jobs.iot.{region_name}.amazonaws.com"
)
# raise InvalidRequestException # raise InvalidRequestException
try: with pytest.raises(ClientError) as exc:
client.describe_endpoint(endpointType="iot:Abc") client.describe_endpoint(endpointType="iot:Abc")
except client.exceptions.InvalidRequestException as exc: err = exc.value.response["Error"]
error_code = exc.response["Error"]["Code"] assert err["Code"] == "InvalidRequestException"
error_code.should.equal("InvalidRequestException")
else:
raise Exception("Should have raised error")
@mock_iot @mock_iot
@ -59,32 +47,34 @@ def test_principal_policy():
client.attach_policy(policyName=policy_name, target=cert_arn) client.attach_policy(policyName=policy_name, target=cert_arn)
res = client.list_principal_policies(principal=cert_arn) res = client.list_principal_policies(principal=cert_arn)
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
for policy in res["policies"]: for policy in res["policies"]:
policy.should.have.key("policyName").which.should_not.be.none assert policy["policyName"] is not None
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
# do nothing if policy have already attached to certificate # do nothing if policy have already attached to certificate
client.attach_policy(policyName=policy_name, target=cert_arn) client.attach_policy(policyName=policy_name, target=cert_arn)
res = client.list_principal_policies(principal=cert_arn) res = client.list_principal_policies(principal=cert_arn)
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
for policy in res["policies"]: for policy in res["policies"]:
policy.should.have.key("policyName").which.should_not.be.none assert policy["policyName"] is not None
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
res = client.list_policy_principals(policyName=policy_name) res = client.list_policy_principals(policyName=policy_name)
res.should.have.key("principals").length_of(1) assert len(res["principals"]) == 1
res["principals"][0].should.match(f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:cert/") assert res["principals"][0].startswith(
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:cert/"
)
client.detach_policy(policyName=policy_name, target=cert_arn) client.detach_policy(policyName=policy_name, target=cert_arn)
res = client.list_principal_policies(principal=cert_arn) res = client.list_principal_policies(principal=cert_arn)
res.should.have.key("policies").which.should.have.length_of(0) assert len(res["policies"]) == 0
res = client.list_policy_principals(policyName=policy_name) res = client.list_policy_principals(policyName=policy_name)
res.should.have.key("principals").which.should.have.length_of(0) assert len(res["principals"]) == 0
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.detach_policy(policyName=policy_name, target=cert_arn) client.detach_policy(policyName=policy_name, target=cert_arn)
e.value.response["Error"]["Code"].should.equal("ResourceNotFoundException") assert e.value.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_iot @mock_iot
@ -99,21 +89,24 @@ def test_principal_policy_deprecated():
client.attach_principal_policy(policyName=policy_name, principal=cert_arn) client.attach_principal_policy(policyName=policy_name, principal=cert_arn)
res = client.list_principal_policies(principal=cert_arn) res = client.list_principal_policies(principal=cert_arn)
res.should.have.key("policies").length_of(1) assert len(res["policies"]) == 1
res["policies"][0].should.have.key("policyName").equal("my-policy") assert res["policies"][0]["policyName"] == "my-policy"
res["policies"][0].should.have.key("policyArn").equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:policy/my-policy" res["policies"][0]["policyArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:policy/my-policy"
) )
res = client.list_policy_principals(policyName=policy_name) res = client.list_policy_principals(policyName=policy_name)
res.should.have.key("principals").length_of(1) assert len(res["principals"]) == 1
res["principals"][0].should.match(f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:cert/") assert res["principals"][0].startswith(
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:cert/"
)
client.detach_principal_policy(policyName=policy_name, principal=cert_arn) client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
res = client.list_principal_policies(principal=cert_arn) res = client.list_principal_policies(principal=cert_arn)
res.should.have.key("policies").which.should.have.length_of(0) assert len(res["policies"]) == 0
res = client.list_policy_principals(policyName=policy_name) res = client.list_policy_principals(policyName=policy_name)
res.should.have.key("principals").which.should.have.length_of(0) assert len(res["principals"]) == 0
@mock_iot @mock_iot
@ -127,22 +120,25 @@ def test_principal_thing():
client.attach_thing_principal(thingName=thing_name, principal=cert_arn) client.attach_thing_principal(thingName=thing_name, principal=cert_arn)
res = client.list_principal_things(principal=cert_arn) res = client.list_principal_things(principal=cert_arn)
res.should.have.key("things").which.should.have.length_of(1) assert len(res["things"]) == 1
res["things"][0].should.equal(thing_name) assert res["things"][0] == thing_name
res = client.list_thing_principals(thingName=thing_name) res = client.list_thing_principals(thingName=thing_name)
res.should.have.key("principals").length_of(1) assert len(res["principals"]) == 1
res["principals"][0].should.match(f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:cert/") assert res["principals"][0].startswith(
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:cert/"
)
client.detach_thing_principal(thingName=thing_name, principal=cert_arn) client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
res = client.list_principal_things(principal=cert_arn) res = client.list_principal_things(principal=cert_arn)
res.should.have.key("things").which.should.have.length_of(0) assert len(res["things"]) == 0
res = client.list_thing_principals(thingName=thing_name) res = client.list_thing_principals(thingName=thing_name)
res.should.have.key("principals").which.should.have.length_of(0) assert len(res["principals"]) == 0
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.list_thing_principals(thingName="xxx") client.list_thing_principals(thingName="xxx")
e.value.response["Error"]["Code"].should.equal("ResourceNotFoundException") assert e.value.response["Error"]["Code"] == "ResourceNotFoundException"
e.value.response["Error"]["Message"].should.equal( assert (
"Failed to list principals for thing xxx because the thing does not exist in your account" e.value.response["Error"]["Message"]
== "Failed to list principals for thing xxx because the thing does not exist in your account"
) )

View File

@ -14,8 +14,8 @@ def test_register_ca_certificate_simple():
verificationCertificate="verification_certificate", verificationCertificate="verification_certificate",
) )
resp.should.have.key("certificateArn") assert "certificateArn" in resp
resp.should.have.key("certificateId") assert "certificateId" in resp
@mock_iot @mock_iot
@ -25,7 +25,7 @@ def test_describe_ca_certificate_unknown():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.describe_ca_certificate(certificateId="a" * 70) client.describe_ca_certificate(certificateId="a" * 70)
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException") assert err["Code"] == "ResourceNotFoundException"
@mock_iot @mock_iot
@ -39,12 +39,12 @@ def test_describe_ca_certificate_simple():
describe = client.describe_ca_certificate(certificateId=resp["certificateId"]) describe = client.describe_ca_certificate(certificateId=resp["certificateId"])
describe.should.have.key("certificateDescription") assert "certificateDescription" in describe
description = describe["certificateDescription"] description = describe["certificateDescription"]
description.should.have.key("certificateArn").equals(resp["certificateArn"]) assert description["certificateArn"] == resp["certificateArn"]
description.should.have.key("certificateId").equals(resp["certificateId"]) assert description["certificateId"] == resp["certificateId"]
description.should.have.key("status").equals("INACTIVE") assert description["status"] == "INACTIVE"
description.should.have.key("certificatePem").equals("ca_certificate") assert description["certificatePem"] == "ca_certificate"
@mock_iot @mock_iot
@ -63,17 +63,15 @@ def test_describe_ca_certificate_advanced():
describe = client.describe_ca_certificate(certificateId=resp["certificateId"]) describe = client.describe_ca_certificate(certificateId=resp["certificateId"])
describe.should.have.key("certificateDescription")
description = describe["certificateDescription"] description = describe["certificateDescription"]
description.should.have.key("certificateArn").equals(resp["certificateArn"]) assert description["certificateArn"] == resp["certificateArn"]
description.should.have.key("certificateId").equals(resp["certificateId"]) assert description["certificateId"] == resp["certificateId"]
description.should.have.key("status").equals("ACTIVE") assert description["status"] == "ACTIVE"
description.should.have.key("certificatePem").equals("ca_certificate") assert description["certificatePem"] == "ca_certificate"
describe.should.have.key("registrationConfig")
config = describe["registrationConfig"] config = describe["registrationConfig"]
config.should.have.key("templateBody").equals("template_b0dy") assert config["templateBody"] == "template_b0dy"
config.should.have.key("roleArn").equals("aws:iot:arn:role/asdfqwerwe") assert config["roleArn"] == "aws:iot:arn:role/asdfqwerwe"
@mock_iot @mock_iot
@ -89,7 +87,7 @@ def test_list_certificates_by_ca():
# list certificates should be empty at first # list certificates should be empty at first
certs = client.list_certificates_by_ca(caCertificateId=ca_cert_id) certs = client.list_certificates_by_ca(caCertificateId=ca_cert_id)
certs.should.have.key("certificates").equals([]) assert certs["certificates"] == []
# create one certificate # create one certificate
cert1 = client.register_certificate( cert1 = client.register_certificate(
@ -98,8 +96,8 @@ def test_list_certificates_by_ca():
# list certificates should return this # list certificates should return this
certs = client.list_certificates_by_ca(caCertificateId=ca_cert_id) certs = client.list_certificates_by_ca(caCertificateId=ca_cert_id)
certs.should.have.key("certificates").length_of(1) assert len(certs["certificates"]) == 1
certs["certificates"][0]["certificateId"].should.equal(cert1["certificateId"]) assert certs["certificates"][0]["certificateId"] == cert1["certificateId"]
# create another certificate, without ca # create another certificate, without ca
client.register_certificate( client.register_certificate(
@ -110,7 +108,7 @@ def test_list_certificates_by_ca():
# list certificate should still only return the first certificate # list certificate should still only return the first certificate
certs = client.list_certificates_by_ca(caCertificateId=ca_cert_id) certs = client.list_certificates_by_ca(caCertificateId=ca_cert_id)
certs.should.have.key("certificates").length_of(1) assert len(certs["certificates"]) == 1
@mock_iot @mock_iot
@ -127,7 +125,7 @@ def test_delete_ca_certificate():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.describe_ca_certificate(certificateId=cert_id) client.describe_ca_certificate(certificateId=cert_id)
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException") assert err["Code"] == "ResourceNotFoundException"
@mock_iot @mock_iot
@ -142,10 +140,11 @@ def test_update_ca_certificate__status():
client.update_ca_certificate(certificateId=cert_id, newStatus="DISABLE") client.update_ca_certificate(certificateId=cert_id, newStatus="DISABLE")
cert = client.describe_ca_certificate(certificateId=cert_id) cert = client.describe_ca_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"] cert_desc = cert["certificateDescription"]
cert_desc.should.have.key("status").which.should.equal("DISABLE") assert cert_desc["status"] == "DISABLE"
cert.should.have.key("registrationConfig").equals( assert cert["registrationConfig"] == {
{"roleArn": "my:old_and_busted:arn", "templateBody": "tb"} "roleArn": "my:old_and_busted:arn",
) "templateBody": "tb",
}
@mock_iot @mock_iot
@ -162,10 +161,11 @@ def test_update_ca_certificate__config():
) )
cert = client.describe_ca_certificate(certificateId=cert_id) cert = client.describe_ca_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"] cert_desc = cert["certificateDescription"]
cert_desc.should.have.key("status").which.should.equal("INACTIVE") assert cert_desc["status"] == "INACTIVE"
cert.should.have.key("registrationConfig").equals( assert cert["registrationConfig"] == {
{"roleArn": "my:new_and_fancy:arn", "templateBody": "tb"} "roleArn": "my:new_and_fancy:arn",
) "templateBody": "tb",
}
@mock_iot @mock_iot
@ -173,4 +173,4 @@ def test_get_registration_code():
client = boto3.client("iot", region_name="us-west-1") client = boto3.client("iot", region_name="us-west-1")
resp = client.get_registration_code() resp = client.get_registration_code()
resp.should.have.key("registrationCode") assert "registrationCode" in resp

View File

@ -15,7 +15,7 @@ def test_certificate_id_generation_deterministic():
cert2 = client.register_certificate( cert2 = client.register_certificate(
certificatePem=cert1["certificatePem"], setAsActive=False certificatePem=cert1["certificatePem"], setAsActive=False
) )
cert2.should.have.key("certificateId").which.should.equal(cert1["certificateId"]) assert cert2["certificateId"] == cert1["certificateId"]
client.delete_certificate(certificateId=cert2["certificateId"]) client.delete_certificate(certificateId=cert2["certificateId"])
@ -25,32 +25,25 @@ def test_create_certificate_from_csr():
client = boto3.client("iot", region_name="us-east-2") client = boto3.client("iot", region_name="us-east-2")
resp = client.create_certificate_from_csr(certificateSigningRequest=csr) resp = client.create_certificate_from_csr(certificateSigningRequest=csr)
resp.should.have.key("certificateArn") assert "certificateArn" in resp
resp.should.have.key("certificateId") assert "certificateId" in resp
resp.should.have.key("certificatePem") assert "certificatePem" in resp
# Can create certificate a second time # Can create certificate a second time
client.create_certificate_from_csr(certificateSigningRequest=csr) client.create_certificate_from_csr(certificateSigningRequest=csr)
client.list_certificates().should.have.key("certificates").length_of(2) assert len(client.list_certificates()["certificates"]) == 2
@mock_iot @mock_iot
def test_create_key_and_certificate(): def test_create_key_and_certificate():
client = boto3.client("iot", region_name="us-east-1") client = boto3.client("iot", region_name="us-east-1")
cert = client.create_keys_and_certificate(setAsActive=True) cert = client.create_keys_and_certificate(setAsActive=True)
cert.should.have.key("certificateArn").which.should_not.be.none assert cert["certificateArn"] is not None
cert.should.have.key("certificateId").which.should_not.be.none assert cert["certificateId"] is not None
cert.should.have.key("certificatePem").which.should.match( assert cert["certificatePem"].startswith("-----BEGIN CERTIFICATE-----")
r"^-----BEGIN CERTIFICATE-----" assert cert["keyPair"]["PublicKey"].startswith("-----BEGIN PUBLIC KEY-----")
) assert cert["keyPair"]["PrivateKey"].startswith("-----BEGIN RSA PRIVATE KEY-----")
cert.should.have.key("keyPair")
cert["keyPair"].should.have.key("PublicKey").which.should.match(
r"^-----BEGIN PUBLIC KEY-----"
)
cert["keyPair"].should.have.key("PrivateKey").which.should.match(
r"^-----BEGIN RSA PRIVATE KEY-----"
)
@mock_iot @mock_iot
@ -60,16 +53,15 @@ def test_describe_certificate_by_id():
cert_id = cert["certificateId"] cert_id = cert["certificateId"]
cert = client.describe_certificate(certificateId=cert_id) cert = client.describe_certificate(certificateId=cert_id)
cert.should.have.key("certificateDescription")
cert_desc = cert["certificateDescription"] cert_desc = cert["certificateDescription"]
cert_desc.should.have.key("certificateArn").which.should_not.be.none assert cert_desc["certificateArn"] is not None
cert_desc.should.have.key("certificateId").which.should_not.be.none assert cert_desc["certificateId"] is not None
cert_desc.should.have.key("certificatePem").which.should_not.be.none assert cert_desc["certificatePem"] is not None
cert_desc.should.have.key("validity").which.should_not.be.none assert cert_desc["validity"] is not None
validity = cert_desc["validity"] validity = cert_desc["validity"]
validity.should.have.key("notBefore").which.should_not.be.none assert validity["notBefore"] is not None
validity.should.have.key("notAfter").which.should_not.be.none assert validity["notAfter"] is not None
cert_desc.should.have.key("status").which.should.equal("ACTIVE") assert cert_desc["status"] == "ACTIVE"
@mock_iot @mock_iot
@ -80,15 +72,15 @@ def test_list_certificates():
res = client.list_certificates() res = client.list_certificates()
for cert in res["certificates"]: for cert in res["certificates"]:
cert.should.have.key("certificateArn").which.should_not.be.none assert cert["certificateArn"] is not None
cert.should.have.key("certificateId").which.should_not.be.none assert cert["certificateId"] is not None
cert.should.have.key("status").which.should_not.be.none assert cert["status"] is not None
cert.should.have.key("creationDate").which.should_not.be.none assert cert["creationDate"] is not None
client.update_certificate(certificateId=cert_id, newStatus="REVOKED") client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
cert = client.describe_certificate(certificateId=cert_id) cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"] cert_desc = cert["certificateDescription"]
cert_desc.should.have.key("status").which.should.equal("REVOKED") assert cert_desc["status"] == "REVOKED"
@mock_iot @mock_iot
@ -100,7 +92,7 @@ def test_update_certificate():
client.update_certificate(certificateId=cert_id, newStatus="REVOKED") client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
cert = client.describe_certificate(certificateId=cert_id) cert = client.describe_certificate(certificateId=cert_id)
cert_desc = cert["certificateDescription"] cert_desc = cert["certificateDescription"]
cert_desc.should.have.key("status").which.should.equal("REVOKED") assert cert_desc["status"] == "REVOKED"
@pytest.mark.parametrize("status", ["REVOKED", "INACTIVE"]) @pytest.mark.parametrize("status", ["REVOKED", "INACTIVE"])
@ -115,7 +107,7 @@ def test_delete_certificate_with_status(status):
client.delete_certificate(certificateId=cert_id) client.delete_certificate(certificateId=cert_id)
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").equals([]) assert res["certificates"] == []
@mock_iot @mock_iot
@ -132,21 +124,21 @@ def test_register_certificate_without_ca():
cert = client.register_certificate_without_ca( cert = client.register_certificate_without_ca(
certificatePem=cert_pem, status="INACTIVE" certificatePem=cert_pem, status="INACTIVE"
) )
cert.should.have.key("certificateId").which.should_not.be.none assert cert["certificateId"] is not None
cert.should.have.key("certificateArn").which.should_not.be.none assert cert["certificateArn"] is not None
cert_id = cert["certificateId"] cert_id = cert["certificateId"]
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(1) assert len(res["certificates"]) == 1
for cert in res["certificates"]: for cert in res["certificates"]:
cert.should.have.key("certificateArn").which.should_not.be.none assert cert["certificateArn"] is not None
cert.should.have.key("certificateId").which.should_not.be.none assert cert["certificateId"] is not None
cert.should.have.key("status").which.should_not.be.none assert cert["status"] is not None
cert.should.have.key("creationDate").which.should_not.be.none assert cert["creationDate"] is not None
client.delete_certificate(certificateId=cert_id) client.delete_certificate(certificateId=cert_id)
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates") assert "certificates" in res
@mock_iot @mock_iot
@ -159,18 +151,20 @@ def test_create_certificate_validation():
client.register_certificate( client.register_certificate(
certificatePem=cert["certificatePem"], setAsActive=False certificatePem=cert["certificatePem"], setAsActive=False
) )
e.value.response["Error"]["Message"].should.contain( assert (
"The certificate is already provisioned or registered" "The certificate is already provisioned or registered"
in e.value.response["Error"]["Message"]
) )
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.register_certificate_without_ca( client.register_certificate_without_ca(
certificatePem=cert["certificatePem"], status="ACTIVE" certificatePem=cert["certificatePem"], status="ACTIVE"
) )
e.value.response.should.have.key("resourceArn").equals(cert["certificateArn"]) assert e.value.response["resourceArn"] == cert["certificateArn"]
e.value.response.should.have.key("resourceId").equals(cert["certificateId"]) assert e.value.response["resourceId"] == cert["certificateId"]
e.value.response["Error"]["Message"].should.contain( assert (
"The certificate is already provisioned or registered" "The certificate is already provisioned or registered"
in e.value.response["Error"]["Message"]
) )
@ -202,34 +196,37 @@ def test_delete_certificate_validation():
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id) client.delete_certificate(certificateId=cert_id)
e.value.response["Error"]["Message"].should.contain( assert (
"Certificate must be deactivated (not ACTIVE) before deletion." "Certificate must be deactivated (not ACTIVE) before deletion."
in e.value.response["Error"]["Message"]
) )
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(1) assert len(res["certificates"]) == 1
client.update_certificate(certificateId=cert_id, newStatus="REVOKED") client.update_certificate(certificateId=cert_id, newStatus="REVOKED")
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id) client.delete_certificate(certificateId=cert_id)
e.value.response["Error"]["Message"].should.contain( assert (
f"Things must be detached before deletion (arn: {cert_arn})" f"Things must be detached before deletion (arn: {cert_arn})"
in e.value.response["Error"]["Message"]
) )
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(1) assert len(res["certificates"]) == 1
client.detach_thing_principal(thingName=thing_name, principal=cert_arn) client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id) client.delete_certificate(certificateId=cert_id)
e.value.response["Error"]["Message"].should.contain( assert (
f"Certificate policies must be detached before deletion (arn: {cert_arn})" f"Certificate policies must be detached before deletion (arn: {cert_arn})"
in e.value.response["Error"]["Message"]
) )
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(1) assert len(res["certificates"]) == 1
client.detach_principal_policy(policyName=policy_name, principal=cert_arn) client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
client.delete_certificate(certificateId=cert_id) client.delete_certificate(certificateId=cert_id)
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(0) assert len(res["certificates"]) == 0
@mock_iot @mock_iot
@ -247,14 +244,14 @@ def test_delete_certificate_force():
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.delete_certificate(certificateId=cert_id, forceDelete=True) client.delete_certificate(certificateId=cert_id, forceDelete=True)
err = e.value.response["Error"] err = e.value.response["Error"]
err["Message"].should.contain("Certificate must be deactivated") assert "Certificate must be deactivated" in err["Message"]
client.update_certificate(certificateId=cert_id, newStatus="INACTIVE") client.update_certificate(certificateId=cert_id, newStatus="INACTIVE")
# If does work if the status is INACTIVE, even though we still have policies attached # If does work if the status is INACTIVE, even though we still have policies attached
client.delete_certificate(certificateId=cert_id, forceDelete=True) client.delete_certificate(certificateId=cert_id, forceDelete=True)
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(0) assert len(res["certificates"]) == 0
@mock_iot @mock_iot
@ -272,9 +269,10 @@ def test_delete_thing_with_certificate_validation():
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.delete_thing(thingName=thing_name) client.delete_thing(thingName=thing_name)
err = e.value.response["Error"] err = e.value.response["Error"]
err["Code"].should.equal("InvalidRequestException") assert err["Code"] == "InvalidRequestException"
err["Message"].should.equals( assert (
f"Cannot delete. Thing {thing_name} is still attached to one or more principals" err["Message"]
== f"Cannot delete. Thing {thing_name} is still attached to one or more principals"
) )
client.detach_thing_principal(thingName=thing_name, principal=cert_arn) client.detach_thing_principal(thingName=thing_name, principal=cert_arn)
@ -283,9 +281,9 @@ def test_delete_thing_with_certificate_validation():
# Certificate still exists # Certificate still exists
res = client.list_certificates() res = client.list_certificates()
res.should.have.key("certificates").which.should.have.length_of(1) assert len(res["certificates"]) == 1
res["certificates"][0]["certificateArn"].should.equal(cert_arn) assert res["certificates"][0]["certificateArn"] == cert_arn
res["certificates"][0]["status"].should.equal("REVOKED") assert res["certificates"][0]["status"] == "REVOKED"
@mock_iot @mock_iot
@ -295,12 +293,10 @@ def test_certs_create_inactive():
cert_id = cert["certificateId"] cert_id = cert["certificateId"]
cert = client.describe_certificate(certificateId=cert_id) cert = client.describe_certificate(certificateId=cert_id)
cert.should.have.key("certificateDescription")
cert_desc = cert["certificateDescription"] cert_desc = cert["certificateDescription"]
cert_desc.should.have.key("status").which.should.equal("INACTIVE") assert cert_desc["status"] == "INACTIVE"
client.update_certificate(certificateId=cert_id, newStatus="ACTIVE") client.update_certificate(certificateId=cert_id, newStatus="ACTIVE")
cert = client.describe_certificate(certificateId=cert_id) cert = client.describe_certificate(certificateId=cert_id)
cert.should.have.key("certificateDescription")
cert_desc = cert["certificateDescription"] cert_desc = cert["certificateDescription"]
cert_desc.should.have.key("status").which.should.equal("ACTIVE") assert cert_desc["status"] == "ACTIVE"

View File

@ -14,16 +14,16 @@ def test_deprecate_undeprecate_thing_type():
) )
res = client.describe_thing_type(thingTypeName=thing_type_name) res = client.describe_thing_type(thingTypeName=thing_type_name)
res["thingTypeMetadata"]["deprecated"].should.equal(False) assert res["thingTypeMetadata"]["deprecated"] is False
client.deprecate_thing_type(thingTypeName=thing_type_name, undoDeprecate=False) client.deprecate_thing_type(thingTypeName=thing_type_name, undoDeprecate=False)
res = client.describe_thing_type(thingTypeName=thing_type_name) res = client.describe_thing_type(thingTypeName=thing_type_name)
res["thingTypeMetadata"]["deprecated"].should.equal(True) assert res["thingTypeMetadata"]["deprecated"] is True
client.deprecate_thing_type(thingTypeName=thing_type_name, undoDeprecate=True) client.deprecate_thing_type(thingTypeName=thing_type_name, undoDeprecate=True)
res = client.describe_thing_type(thingTypeName=thing_type_name) res = client.describe_thing_type(thingTypeName=thing_type_name)
res["thingTypeMetadata"]["deprecated"].should.equal(False) assert res["thingTypeMetadata"]["deprecated"] is False
@mock_iot @mock_iot

View File

@ -10,10 +10,8 @@ def test_create_domain_configuration_only_name():
domain_config = client.create_domain_configuration( domain_config = client.create_domain_configuration(
domainConfigurationName="testConfig" domainConfigurationName="testConfig"
) )
domain_config.should.have.key("domainConfigurationName").which.should.equal( assert domain_config["domainConfigurationName"] == "testConfig"
"testConfig" assert domain_config["domainConfigurationArn"] is not None
)
domain_config.should.have.key("domainConfigurationArn").which.should_not.be.none
@mock_iot @mock_iot
@ -22,15 +20,13 @@ def test_create_duplicate_domain_configuration_fails():
domain_config = client.create_domain_configuration( domain_config = client.create_domain_configuration(
domainConfigurationName="testConfig" domainConfigurationName="testConfig"
) )
domain_config.should.have.key("domainConfigurationName").which.should.equal( assert domain_config["domainConfigurationName"] == "testConfig"
"testConfig" assert domain_config["domainConfigurationArn"] is not None
)
domain_config.should.have.key("domainConfigurationArn").which.should_not.be.none
with pytest.raises(client.exceptions.ResourceAlreadyExistsException) as exc: with pytest.raises(client.exceptions.ResourceAlreadyExistsException) as exc:
client.create_domain_configuration(domainConfigurationName="testConfig") client.create_domain_configuration(domainConfigurationName="testConfig")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ResourceAlreadyExistsException") assert err["Code"] == "ResourceAlreadyExistsException"
err["Message"].should.equal("Domain configuration with given name already exists.") assert err["Message"] == "Domain configuration with given name already exists."
@mock_iot @mock_iot
@ -47,10 +43,8 @@ def test_create_domain_configuration_full_params():
}, },
serviceType="DATA", serviceType="DATA",
) )
domain_config.should.have.key("domainConfigurationName").which.should.equal( assert domain_config["domainConfigurationName"] == "testConfig"
"testConfig" assert domain_config["domainConfigurationArn"] is not None
)
domain_config.should.have.key("domainConfigurationArn").which.should_not.be.none
@mock_iot @mock_iot
@ -61,9 +55,10 @@ def test_create_domain_configuration_invalid_service_type():
domainConfigurationName="testConfig", serviceType="INVALIDTYPE" domainConfigurationName="testConfig", serviceType="INVALIDTYPE"
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("InvalidRequestException") assert err["Code"] == "InvalidRequestException"
err["Message"].should.equal( assert (
"An error occurred (InvalidRequestException) when calling the DescribeDomainConfiguration operation: Service type INVALIDTYPE not recognized." err["Message"]
== "An error occurred (InvalidRequestException) when calling the DescribeDomainConfiguration operation: Service type INVALIDTYPE not recognized."
) )
@ -73,8 +68,8 @@ def test_describe_nonexistent_domain_configuration():
with pytest.raises(client.exceptions.ResourceNotFoundException) as exc: with pytest.raises(client.exceptions.ResourceNotFoundException) as exc:
client.describe_domain_configuration(domainConfigurationName="doesntExist") client.describe_domain_configuration(domainConfigurationName="doesntExist")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException") assert err["Code"] == "ResourceNotFoundException"
err["Message"].should.equal("The specified resource does not exist.") assert err["Message"] == "The specified resource does not exist."
@mock_iot @mock_iot
@ -95,18 +90,14 @@ def test_describe_domain_configuration():
described_config = client.describe_domain_configuration( described_config = client.describe_domain_configuration(
domainConfigurationName="testConfig" domainConfigurationName="testConfig"
) )
described_config.should.have.key("domainConfigurationName").which.should.equal( assert described_config["domainConfigurationName"] == "testConfig"
"testConfig" assert described_config["domainConfigurationArn"]
) assert described_config["serverCertificates"]
described_config.should.have.key("domainConfigurationArn") assert described_config["authorizerConfig"]
described_config.should.have.key("serverCertificates") assert described_config["domainConfigurationStatus"] == "ENABLED"
described_config.should.have.key("authorizerConfig") assert described_config["serviceType"] == "DATA"
described_config.should.have.key("domainConfigurationStatus").which.should.equal( assert described_config["domainType"]
"ENABLED" assert described_config["lastStatusChangeDate"]
)
described_config.should.have.key("serviceType").which.should.equal("DATA")
described_config.should.have.key("domainType")
described_config.should.have.key("lastStatusChangeDate")
@mock_iot @mock_iot
@ -131,18 +122,10 @@ def test_update_domain_configuration():
}, },
domainConfigurationStatus="DISABLED", domainConfigurationStatus="DISABLED",
) )
described_updated_config = client.describe_domain_configuration( updated = client.describe_domain_configuration(domainConfigurationName="testConfig")
domainConfigurationName="testConfig" assert updated["authorizerConfig"]["defaultAuthorizerName"] == "updatedName"
) assert updated["authorizerConfig"]["allowAuthorizerOverride"] is False
described_updated_config.should.have.key("authorizerConfig").which.should.have.key( assert updated["domainConfigurationStatus"] == "DISABLED"
"defaultAuthorizerName"
).which.should.equal("updatedName")
described_updated_config.should.have.key("authorizerConfig").which.should.have.key(
"allowAuthorizerOverride"
).which.should.equal(False)
described_updated_config.should.have.key(
"domainConfigurationStatus"
).which.should.equal("DISABLED")
@mock_iot @mock_iot
@ -162,10 +145,8 @@ def test_update_domain_configuration_remove_authorizer_type():
client.update_domain_configuration( client.update_domain_configuration(
domainConfigurationName="testConfig", removeAuthorizerConfig=True domainConfigurationName="testConfig", removeAuthorizerConfig=True
) )
described_updated_config = client.describe_domain_configuration( config = client.describe_domain_configuration(domainConfigurationName="testConfig")
domainConfigurationName="testConfig" assert "authorizerConfig" not in config
)
described_updated_config.should_not.have.key("authorizerConfig")
@mock_iot @mock_iot
@ -174,8 +155,8 @@ def test_update_nonexistent_domain_configuration():
with pytest.raises(client.exceptions.ResourceNotFoundException) as exc: with pytest.raises(client.exceptions.ResourceNotFoundException) as exc:
client.update_domain_configuration(domainConfigurationName="doesntExist") client.update_domain_configuration(domainConfigurationName="doesntExist")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException") assert err["Code"] == "ResourceNotFoundException"
err["Message"].should.equal("The specified resource does not exist.") assert err["Message"] == "The specified resource does not exist."
@mock_iot @mock_iot
@ -183,16 +164,10 @@ def test_list_domain_configuration():
client = boto3.client("iot", region_name="us-east-1") client = boto3.client("iot", region_name="us-east-1")
client.create_domain_configuration(domainConfigurationName="testConfig1") client.create_domain_configuration(domainConfigurationName="testConfig1")
client.create_domain_configuration(domainConfigurationName="testConfig2") client.create_domain_configuration(domainConfigurationName="testConfig2")
domain_configs = client.list_domain_configurations() domain_configs = client.list_domain_configurations()["domainConfigurations"]
domain_configs.should.have.key("domainConfigurations").which.should.have.length_of( assert len(domain_configs) == 2
2 assert domain_configs[0]["domainConfigurationName"] == "testConfig1"
) assert domain_configs[1]["domainConfigurationName"] == "testConfig2"
domain_configs["domainConfigurations"][0].should.have.key(
"domainConfigurationName"
).which.should.equal("testConfig1")
domain_configs["domainConfigurations"][1].should.have.key(
"domainConfigurationName"
).which.should.equal("testConfig2")
@mock_iot @mock_iot
@ -200,14 +175,10 @@ def test_delete_domain_configuration():
client = boto3.client("iot", region_name="us-east-1") client = boto3.client("iot", region_name="us-east-1")
client.create_domain_configuration(domainConfigurationName="testConfig") client.create_domain_configuration(domainConfigurationName="testConfig")
domain_configs = client.list_domain_configurations() domain_configs = client.list_domain_configurations()
domain_configs.should.have.key("domainConfigurations").which.should.have.length_of( assert len(domain_configs["domainConfigurations"]) == 1
1
)
client.delete_domain_configuration(domainConfigurationName="testConfig") client.delete_domain_configuration(domainConfigurationName="testConfig")
domain_configs = client.list_domain_configurations() domain_configs = client.list_domain_configurations()
domain_configs.should.have.key("domainConfigurations").which.should.have.length_of( assert len(domain_configs["domainConfigurations"]) == 0
0
)
@mock_iot @mock_iot
@ -216,5 +187,5 @@ def test_delete_nonexistent_domain_configuration():
with pytest.raises(client.exceptions.ResourceNotFoundException) as exc: with pytest.raises(client.exceptions.ResourceNotFoundException) as exc:
client.delete_domain_configuration(domainConfigurationName="doesntExist") client.delete_domain_configuration(domainConfigurationName="doesntExist")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException") assert err["Code"] == "ResourceNotFoundException"
err["Message"].should.equal("The specified resource does not exist.") assert err["Message"] == "The specified resource does not exist."

View File

@ -13,8 +13,8 @@ def test_describe_job_execution():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -32,64 +32,43 @@ def test_describe_job_execution():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job.should.have.key("description") assert "description" in job
job_execution = client.describe_job_execution(jobId=job_id, thingName=name) job_execution = client.describe_job_execution(jobId=job_id, thingName=name)
job_execution.should.have.key("execution") assert job_execution["execution"]["jobId"] == job_id
job_execution["execution"].should.have.key("jobId").which.should.equal(job_id) assert job_execution["execution"]["status"] == "QUEUED"
job_execution["execution"].should.have.key("status").which.should.equal("QUEUED") assert job_execution["execution"]["forceCanceled"] is False
job_execution["execution"].should.have.key("forceCanceled").which.should.equal( assert job_execution["execution"]["statusDetails"] == {"detailsMap": {}}
False assert job_execution["execution"]["thingArn"] == thing["thingArn"]
) assert "queuedAt" in job_execution["execution"]
job_execution["execution"].should.have.key("statusDetails").which.should.equal( assert "startedAt" in job_execution["execution"]
{"detailsMap": {}} assert "lastUpdatedAt" in job_execution["execution"]
) assert job_execution["execution"]["executionNumber"] == 123
job_execution["execution"].should.have.key("thingArn").which.should.equal( assert job_execution["execution"]["versionNumber"] == 123
thing["thingArn"] assert job_execution["execution"]["approximateSecondsBeforeTimedOut"] == 123
)
job_execution["execution"].should.have.key("queuedAt")
job_execution["execution"].should.have.key("startedAt")
job_execution["execution"].should.have.key("lastUpdatedAt")
job_execution["execution"].should.have.key("executionNumber").which.should.equal(
123
)
job_execution["execution"].should.have.key("versionNumber").which.should.equal(123)
job_execution["execution"].should.have.key(
"approximateSecondsBeforeTimedOut"
).which.should.equal(123)
job_execution = client.describe_job_execution( job_execution = client.describe_job_execution(
jobId=job_id, thingName=name, executionNumber=123 jobId=job_id, thingName=name, executionNumber=123
) )
job_execution.should.have.key("execution") assert "execution" in job_execution
job_execution["execution"].should.have.key("jobId").which.should.equal(job_id) assert job_execution["execution"]["jobId"] == job_id
job_execution["execution"].should.have.key("status").which.should.equal("QUEUED") assert job_execution["execution"]["status"] == "QUEUED"
job_execution["execution"].should.have.key("forceCanceled").which.should.equal( assert job_execution["execution"]["forceCanceled"] is False
False assert job_execution["execution"]["statusDetails"] == {"detailsMap": {}}
) assert job_execution["execution"]["thingArn"] == thing["thingArn"]
job_execution["execution"].should.have.key("statusDetails").which.should.equal( assert "queuedAt" in job_execution["execution"]
{"detailsMap": {}} assert "startedAt" in job_execution["execution"]
) assert "lastUpdatedAt" in job_execution["execution"]
job_execution["execution"].should.have.key("thingArn").which.should.equal( assert job_execution["execution"]["executionNumber"] == 123
thing["thingArn"] assert job_execution["execution"]["versionNumber"] == 123
) assert job_execution["execution"]["approximateSecondsBeforeTimedOut"] == 123
job_execution["execution"].should.have.key("queuedAt")
job_execution["execution"].should.have.key("startedAt")
job_execution["execution"].should.have.key("lastUpdatedAt")
job_execution["execution"].should.have.key("executionNumber").which.should.equal(
123
)
job_execution["execution"].should.have.key("versionNumber").which.should.equal(123)
job_execution["execution"].should.have.key(
"approximateSecondsBeforeTimedOut"
).which.should.equal(123)
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.describe_job_execution(jobId=job_id, thingName=name, executionNumber=456) client.describe_job_execution(jobId=job_id, thingName=name, executionNumber=456)
error_code = exc.value.response["Error"]["Code"] error_code = exc.value.response["Error"]["Code"]
error_code.should.equal("ResourceNotFoundException") assert error_code == "ResourceNotFoundException"
@mock_iot @mock_iot
@ -99,8 +78,8 @@ def test_cancel_job_execution():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -118,14 +97,14 @@ def test_cancel_job_execution():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job.should.have.key("description") assert "description" in job
client.cancel_job_execution(jobId=job_id, thingName=name) client.cancel_job_execution(jobId=job_id, thingName=name)
job_execution = client.describe_job_execution(jobId=job_id, thingName=name) job_execution = client.describe_job_execution(jobId=job_id, thingName=name)
job_execution.should.have.key("execution") assert "execution" in job_execution
job_execution["execution"].should.have.key("status").which.should.equal("CANCELED") assert job_execution["execution"]["status"] == "CANCELED"
@mock_iot @mock_iot
@ -135,8 +114,8 @@ def test_delete_job_execution():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -154,16 +133,16 @@ def test_delete_job_execution():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job.should.have.key("description") assert "description" in job
client.delete_job_execution(jobId=job_id, thingName=name, executionNumber=123) client.delete_job_execution(jobId=job_id, thingName=name, executionNumber=123)
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.describe_job_execution(jobId=job_id, thingName=name, executionNumber=123) client.describe_job_execution(jobId=job_id, thingName=name, executionNumber=123)
error_code = exc.value.response["Error"]["Code"] error_code = exc.value.response["Error"]["Code"]
error_code.should.equal("ResourceNotFoundException") assert error_code == "ResourceNotFoundException"
@mock_iot @mock_iot
@ -173,8 +152,8 @@ def test_list_job_executions_for_job():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -192,21 +171,15 @@ def test_list_job_executions_for_job():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job.should.have.key("description") assert "description" in job
job_execution = client.list_job_executions_for_job(jobId=job_id) job_execution = client.list_job_executions_for_job(jobId=job_id)
job_execution.should.have.key("executionSummaries") assert job_execution["executionSummaries"][0]["thingArn"] == thing["thingArn"]
job_execution["executionSummaries"][0].should.have.key(
"thingArn"
).which.should.equal(thing["thingArn"])
job_execution = client.list_job_executions_for_job(jobId=job_id, status="QUEUED") job_execution = client.list_job_executions_for_job(jobId=job_id, status="QUEUED")
job_execution.should.have.key("executionSummaries") assert job_execution["executionSummaries"][0]["thingArn"] == thing["thingArn"]
job_execution["executionSummaries"][0].should.have.key(
"thingArn"
).which.should.equal(thing["thingArn"])
@mock_iot @mock_iot
@ -216,8 +189,8 @@ def test_list_job_executions_for_thing():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -235,23 +208,17 @@ def test_list_job_executions_for_thing():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job.should.have.key("description") assert "description" in job
job_execution = client.list_job_executions_for_thing(thingName=name) job_execution = client.list_job_executions_for_thing(thingName=name)
job_execution.should.have.key("executionSummaries") assert job_execution["executionSummaries"][0]["jobId"] == job_id
job_execution["executionSummaries"][0].should.have.key("jobId").which.should.equal(
job_id
)
job_execution = client.list_job_executions_for_thing( job_execution = client.list_job_executions_for_thing(
thingName=name, status="QUEUED" thingName=name, status="QUEUED"
) )
job_execution.should.have.key("executionSummaries") assert job_execution["executionSummaries"][0]["jobId"] == job_id
job_execution["executionSummaries"][0].should.have.key("jobId").which.should.equal(
job_id
)
@mock_iot @mock_iot
@ -269,19 +236,17 @@ def test_list_job_executions_for_thing_paginated():
res = client.list_job_executions_for_thing(thingName=name, maxResults=2) res = client.list_job_executions_for_thing(thingName=name, maxResults=2)
executions = res["executionSummaries"] executions = res["executionSummaries"]
executions.should.have.length_of(2) assert len(executions) == 2
res.should.have.key("nextToken")
res = client.list_job_executions_for_thing( res = client.list_job_executions_for_thing(
thingName=name, maxResults=1, nextToken=res["nextToken"] thingName=name, maxResults=1, nextToken=res["nextToken"]
) )
executions = res["executionSummaries"] executions = res["executionSummaries"]
executions.should.have.length_of(1) assert len(executions) == 1
res.should.have.key("nextToken")
res = client.list_job_executions_for_thing( res = client.list_job_executions_for_thing(
thingName=name, nextToken=res["nextToken"] thingName=name, nextToken=res["nextToken"]
) )
executions = res["executionSummaries"] executions = res["executionSummaries"]
executions.should.have.length_of(7) assert len(executions) == 7
res.shouldnt.have.key("nextToken") assert "nextToken" not in res

View File

@ -14,8 +14,8 @@ def test_create_job():
# "field": "value" # "field": "value"
# } # }
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -33,9 +33,9 @@ def test_create_job():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job.should.have.key("description") assert "description" in job
@mock_iot @mock_iot
@ -48,8 +48,8 @@ def test_list_jobs():
# "field": "value" # "field": "value"
# } # }
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -67,9 +67,9 @@ def test_list_jobs():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job1.should.have.key("jobId").which.should.equal(job_id) assert job1["jobId"] == job_id
job1.should.have.key("jobArn") assert "jobArn" in job1
job1.should.have.key("description") assert "description" in job1
job2 = client.create_job( job2 = client.create_job(
jobId=job_id + "1", jobId=job_id + "1",
@ -84,15 +84,15 @@ def test_list_jobs():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job2.should.have.key("jobId").which.should.equal(job_id + "1") assert job2["jobId"] == job_id + "1"
job2.should.have.key("jobArn") assert "jobArn" in job2
job2.should.have.key("description") assert "description" in job2
jobs = client.list_jobs() jobs = client.list_jobs()
jobs.should.have.key("jobs") assert "jobs" in jobs
jobs.should_not.have.key("nextToken") assert "nextToken" not in jobs
jobs["jobs"][0].should.have.key("jobId").which.should.equal(job_id) assert jobs["jobs"][0]["jobId"] == job_id
jobs["jobs"][1].should.have.key("jobId").which.should.equal(job_id + "1") assert jobs["jobs"][1]["jobId"] == job_id + "1"
@mock_iot @mock_iot
@ -102,8 +102,8 @@ def test_describe_job():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
job = client.create_job( job = client.create_job(
jobId=job_id, jobId=job_id,
@ -117,34 +117,28 @@ def test_describe_job():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job = client.describe_job(jobId=job_id) resp = client.describe_job(jobId=job_id)
job.should.have.key("documentSource") assert "documentSource" in resp
job.should.have.key("job")
job.should.have.key("job").which.should.have.key("jobArn") job = resp["job"]
job.should.have.key("job").which.should.have.key("jobId").which.should.equal(job_id) assert "jobArn" in job
job.should.have.key("job").which.should.have.key("targets") assert job["jobId"] == job_id
job.should.have.key("job").which.should.have.key("jobProcessDetails") assert "targets" in job
job.should.have.key("job").which.should.have.key("lastUpdatedAt") assert "jobProcessDetails" in job
job.should.have.key("job").which.should.have.key("createdAt") assert "lastUpdatedAt" in job
job.should.have.key("job").which.should.have.key("jobExecutionsRolloutConfig") assert "createdAt" in job
job.should.have.key("job").which.should.have.key( assert "jobExecutionsRolloutConfig" in job
"targetSelection" assert job["targetSelection"] == "CONTINUOUS"
).which.should.equal("CONTINUOUS") assert "presignedUrlConfig" in job
job.should.have.key("job").which.should.have.key("presignedUrlConfig") assert (
job.should.have.key("job").which.should.have.key( job["presignedUrlConfig"]["roleArn"]
"presignedUrlConfig" == "arn:aws:iam::1:role/service-role/iot_job_role"
).which.should.have.key("roleArn").which.should.equal(
"arn:aws:iam::1:role/service-role/iot_job_role"
) )
job.should.have.key("job").which.should.have.key( assert job["presignedUrlConfig"]["expiresInSec"] == 123
"presignedUrlConfig" assert job["jobExecutionsRolloutConfig"]["maximumPerMinute"] == 10
).which.should.have.key("expiresInSec").which.should.equal(123)
job.should.have.key("job").which.should.have.key(
"jobExecutionsRolloutConfig"
).which.should.have.key("maximumPerMinute").which.should.equal(10)
@mock_iot @mock_iot
@ -154,8 +148,8 @@ def test_describe_job_1():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -172,33 +166,23 @@ def test_describe_job_1():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job = client.describe_job(jobId=job_id) job = client.describe_job(jobId=job_id)["job"]
job.should.have.key("job") assert "jobArn" in job
job.should.have.key("job").which.should.have.key("jobArn") assert job["jobId"] == job_id
job.should.have.key("job").which.should.have.key("jobId").which.should.equal(job_id) assert "targets" in job
job.should.have.key("job").which.should.have.key("targets") assert "jobProcessDetails" in job
job.should.have.key("job").which.should.have.key("jobProcessDetails") assert "lastUpdatedAt" in job
job.should.have.key("job").which.should.have.key("lastUpdatedAt") assert "createdAt" in job
job.should.have.key("job").which.should.have.key("createdAt") assert job["targetSelection"] == "CONTINUOUS"
job.should.have.key("job").which.should.have.key("jobExecutionsRolloutConfig") assert (
job.should.have.key("job").which.should.have.key( job["presignedUrlConfig"]["roleArn"]
"targetSelection" == "arn:aws:iam::1:role/service-role/iot_job_role"
).which.should.equal("CONTINUOUS")
job.should.have.key("job").which.should.have.key("presignedUrlConfig")
job.should.have.key("job").which.should.have.key(
"presignedUrlConfig"
).which.should.have.key("roleArn").which.should.equal(
"arn:aws:iam::1:role/service-role/iot_job_role"
) )
job.should.have.key("job").which.should.have.key( assert job["presignedUrlConfig"]["expiresInSec"] == 123
"presignedUrlConfig" assert job["jobExecutionsRolloutConfig"]["maximumPerMinute"] == 10
).which.should.have.key("expiresInSec").which.should.equal(123)
job.should.have.key("job").which.should.have.key(
"jobExecutionsRolloutConfig"
).which.should.have.key("maximumPerMinute").which.should.equal(10)
@mock_iot @mock_iot
@ -208,8 +192,8 @@ def test_delete_job():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
job = client.create_job( job = client.create_job(
jobId=job_id, jobId=job_id,
@ -223,16 +207,15 @@ def test_delete_job():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job = client.describe_job(jobId=job_id) job = client.describe_job(jobId=job_id)["job"]
job.should.have.key("job") assert job["jobId"] == job_id
job.should.have.key("job").which.should.have.key("jobId").which.should.equal(job_id)
client.delete_job(jobId=job_id) client.delete_job(jobId=job_id)
client.list_jobs()["jobs"].should.have.length_of(0) assert client.list_jobs()["jobs"] == []
@mock_iot @mock_iot
@ -242,8 +225,8 @@ def test_cancel_job():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
job = client.create_job( job = client.create_job(
jobId=job_id, jobId=job_id,
@ -257,32 +240,22 @@ def test_cancel_job():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job = client.describe_job(jobId=job_id) job = client.describe_job(jobId=job_id)["job"]
job.should.have.key("job") assert job["jobId"] == job_id
job.should.have.key("job").which.should.have.key("jobId").which.should.equal(job_id)
job = client.cancel_job(jobId=job_id, reasonCode="Because", comment="You are") job = client.cancel_job(jobId=job_id, reasonCode="Because", comment="You are")
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job = client.describe_job(jobId=job_id) job = client.describe_job(jobId=job_id)["job"]
job.should.have.key("job") assert job["jobId"] == job_id
job.should.have.key("job").which.should.have.key("jobId").which.should.equal(job_id) assert job["status"] == "CANCELED"
job.should.have.key("job").which.should.have.key("status").which.should.equal( assert job["forceCanceled"] is False
"CANCELED" assert job["reasonCode"] == "Because"
) assert job["comment"] == "You are"
job.should.have.key("job").which.should.have.key(
"forceCanceled"
).which.should.equal(False)
job.should.have.key("job").which.should.have.key("reasonCode").which.should.equal(
"Because"
)
job.should.have.key("job").which.should.have.key("comment").which.should.equal(
"You are"
)
@mock_iot @mock_iot
@ -292,8 +265,8 @@ def test_get_job_document_with_document_source():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
job = client.create_job( job = client.create_job(
jobId=job_id, jobId=job_id,
@ -307,11 +280,11 @@ def test_get_job_document_with_document_source():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job_document = client.get_job_document(jobId=job_id) job_document = client.get_job_document(jobId=job_id)
job_document.should.have.key("document").which.should.equal("") assert job_document["document"] == ""
@mock_iot @mock_iot
@ -321,8 +294,8 @@ def test_get_job_document_with_document():
job_id = "TestJob" job_id = "TestJob"
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# job document # job document
job_document = {"field": "value"} job_document = {"field": "value"}
@ -339,8 +312,8 @@ def test_get_job_document_with_document():
jobExecutionsRolloutConfig={"maximumPerMinute": 10}, jobExecutionsRolloutConfig={"maximumPerMinute": 10},
) )
job.should.have.key("jobId").which.should.equal(job_id) assert job["jobId"] == job_id
job.should.have.key("jobArn") assert "jobArn" in job
job_document = client.get_job_document(jobId=job_id) job_document = client.get_job_document(jobId=job_id)
job_document.should.have.key("document").which.should.equal('{"field": "value"}') assert job_document["document"] == '{"field": "value"}'

View File

@ -30,12 +30,12 @@ def test_attach_policy(iot_client, policy):
iot_client.attach_policy(policyName=policy_name, target=cert_arn) iot_client.attach_policy(policyName=policy_name, target=cert_arn)
res = iot_client.list_attached_policies(target=cert_arn) res = iot_client.list_attached_policies(target=cert_arn)
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
res["policies"][0]["policyName"].should.equal("my-policy") assert res["policies"][0]["policyName"] == "my-policy"
res = iot_client.list_attached_policies(target=cert_arn) res = iot_client.list_attached_policies(target=cert_arn)
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
res["policies"][0]["policyName"].should.equal("my-policy") assert res["policies"][0]["policyName"] == "my-policy"
@mock_cognitoidentity @mock_cognitoidentity
@ -54,8 +54,8 @@ def test_attach_policy_to_identity(region_name, iot_client, policy):
client.attach_policy(policyName=policy_name, target=identity["IdentityId"]) client.attach_policy(policyName=policy_name, target=identity["IdentityId"])
res = client.list_attached_policies(target=identity["IdentityId"]) res = client.list_attached_policies(target=identity["IdentityId"])
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
res["policies"][0]["policyName"].should.equal(policy_name) assert res["policies"][0]["policyName"] == policy_name
def test_detach_policy(iot_client, policy): def test_detach_policy(iot_client, policy):
@ -67,18 +67,18 @@ def test_detach_policy(iot_client, policy):
iot_client.attach_policy(policyName=policy_name, target=cert_arn) iot_client.attach_policy(policyName=policy_name, target=cert_arn)
res = iot_client.list_attached_policies(target=cert_arn) res = iot_client.list_attached_policies(target=cert_arn)
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
res["policies"][0]["policyName"].should.equal(policy_name) assert res["policies"][0]["policyName"] == policy_name
iot_client.detach_policy(policyName=policy_name, target=cert_arn) iot_client.detach_policy(policyName=policy_name, target=cert_arn)
res = iot_client.list_attached_policies(target=cert_arn) res = iot_client.list_attached_policies(target=cert_arn)
res.should.have.key("policies").which.should.be.empty assert res["policies"] == []
def test_list_attached_policies(iot_client): def test_list_attached_policies(iot_client):
cert = iot_client.create_keys_and_certificate(setAsActive=True) cert = iot_client.create_keys_and_certificate(setAsActive=True)
policies = iot_client.list_attached_policies(target=cert["certificateArn"]) policies = iot_client.list_attached_policies(target=cert["certificateArn"])
policies["policies"].should.equal([]) assert policies["policies"] == []
def test_policy_versions(iot_client): def test_policy_versions(iot_client):
@ -86,123 +86,103 @@ def test_policy_versions(iot_client):
doc = "{}" doc = "{}"
policy = iot_client.create_policy(policyName=policy_name, policyDocument=doc) policy = iot_client.create_policy(policyName=policy_name, policyDocument=doc)
policy.should.have.key("policyName").which.should.equal(policy_name) assert policy["policyName"] == policy_name
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
policy.should.have.key("policyDocument").which.should.equal(json.dumps({})) assert policy["policyDocument"] == json.dumps({})
policy.should.have.key("policyVersionId").which.should.equal("1") assert policy["policyVersionId"] == "1"
policy = iot_client.get_policy(policyName=policy_name) policy = iot_client.get_policy(policyName=policy_name)
policy.should.have.key("policyName").which.should.equal(policy_name) assert policy["policyName"] == policy_name
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
policy.should.have.key("policyDocument").which.should.equal(json.dumps({})) assert policy["policyDocument"] == json.dumps({})
policy.should.have.key("defaultVersionId").which.should.equal( assert policy["defaultVersionId"] == policy["defaultVersionId"]
policy["defaultVersionId"]
)
policy1 = iot_client.create_policy_version( policy1 = iot_client.create_policy_version(
policyName=policy_name, policyName=policy_name,
policyDocument=json.dumps({"version": "version_1"}), policyDocument=json.dumps({"version": "version_1"}),
setAsDefault=True, setAsDefault=True,
) )
policy1.should.have.key("policyArn").which.should_not.be.none assert policy1["policyArn"] is not None
policy1.should.have.key("policyDocument").which.should.equal( assert policy1["policyDocument"] == json.dumps({"version": "version_1"})
json.dumps({"version": "version_1"}) assert policy1["policyVersionId"] == "2"
) assert policy1["isDefaultVersion"] is True
policy1.should.have.key("policyVersionId").which.should.equal("2")
policy1.should.have.key("isDefaultVersion").which.should.equal(True)
policy2 = iot_client.create_policy_version( policy2 = iot_client.create_policy_version(
policyName=policy_name, policyName=policy_name,
policyDocument=json.dumps({"version": "version_2"}), policyDocument=json.dumps({"version": "version_2"}),
setAsDefault=False, setAsDefault=False,
) )
policy2.should.have.key("policyArn").which.should_not.be.none assert policy2["policyArn"] is not None
policy2.should.have.key("policyDocument").which.should.equal( assert policy2["policyDocument"] == json.dumps({"version": "version_2"})
json.dumps({"version": "version_2"}) assert policy2["policyVersionId"] == "3"
) assert policy2["isDefaultVersion"] is False
policy2.should.have.key("policyVersionId").which.should.equal("3")
policy2.should.have.key("isDefaultVersion").which.should.equal(False)
policy = iot_client.get_policy(policyName=policy_name) policy = iot_client.get_policy(policyName=policy_name)
policy.should.have.key("policyName").which.should.equal(policy_name) assert policy["policyName"] == policy_name
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
policy.should.have.key("policyDocument").which.should.equal( assert policy["policyDocument"] == json.dumps({"version": "version_1"})
json.dumps({"version": "version_1"}) assert policy["defaultVersionId"] == policy1["policyVersionId"]
)
policy.should.have.key("defaultVersionId").which.should.equal(
policy1["policyVersionId"]
)
policy3 = iot_client.create_policy_version( policy3 = iot_client.create_policy_version(
policyName=policy_name, policyName=policy_name,
policyDocument=json.dumps({"version": "version_3"}), policyDocument=json.dumps({"version": "version_3"}),
setAsDefault=False, setAsDefault=False,
) )
policy3.should.have.key("policyArn").which.should_not.be.none assert policy3["policyArn"] is not None
policy3.should.have.key("policyDocument").which.should.equal( assert policy3["policyDocument"] == json.dumps({"version": "version_3"})
json.dumps({"version": "version_3"}) assert policy3["policyVersionId"] == "4"
) assert policy3["isDefaultVersion"] is False
policy3.should.have.key("policyVersionId").which.should.equal("4")
policy3.should.have.key("isDefaultVersion").which.should.equal(False)
policy4 = iot_client.create_policy_version( policy4 = iot_client.create_policy_version(
policyName=policy_name, policyName=policy_name,
policyDocument=json.dumps({"version": "version_4"}), policyDocument=json.dumps({"version": "version_4"}),
setAsDefault=False, setAsDefault=False,
) )
policy4.should.have.key("policyArn").which.should_not.be.none assert policy4["policyArn"] is not None
policy4.should.have.key("policyDocument").which.should.equal( assert policy4["policyDocument"] == json.dumps({"version": "version_4"})
json.dumps({"version": "version_4"}) assert policy4["policyVersionId"] == "5"
) assert policy4["isDefaultVersion"] is False
policy4.should.have.key("policyVersionId").which.should.equal("5")
policy4.should.have.key("isDefaultVersion").which.should.equal(False)
policy_versions = iot_client.list_policy_versions(policyName=policy_name) policy_versions = iot_client.list_policy_versions(policyName=policy_name)[
policy_versions.should.have.key("policyVersions").which.should.have.length_of(5) "policyVersions"
list( ]
map(lambda item: item["isDefaultVersion"], policy_versions["policyVersions"]) assert len(policy_versions) == 5
).count(True).should.equal(1) assert (
list(map(lambda item: item["isDefaultVersion"], policy_versions)).count(True)
== 1
)
default_policy = list( default_policy = list(
filter(lambda item: item["isDefaultVersion"], policy_versions["policyVersions"]) filter(lambda item: item["isDefaultVersion"], policy_versions)
)
default_policy[0].should.have.key("versionId").should.equal(
policy1["policyVersionId"]
) )
assert default_policy[0]["versionId"] == policy1["policyVersionId"]
policy = iot_client.get_policy(policyName=policy_name) policy = iot_client.get_policy(policyName=policy_name)
policy.should.have.key("policyName").which.should.equal(policy_name) assert policy["policyName"] == policy_name
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
policy.should.have.key("policyDocument").which.should.equal( assert policy["policyDocument"] == json.dumps({"version": "version_1"})
json.dumps({"version": "version_1"}) assert policy["defaultVersionId"] == policy1["policyVersionId"]
)
policy.should.have.key("defaultVersionId").which.should.equal(
policy1["policyVersionId"]
)
iot_client.set_default_policy_version( iot_client.set_default_policy_version(
policyName=policy_name, policyVersionId=policy4["policyVersionId"] policyName=policy_name, policyVersionId=policy4["policyVersionId"]
) )
policy_versions = iot_client.list_policy_versions(policyName=policy_name) policy_versions = iot_client.list_policy_versions(policyName=policy_name)[
policy_versions.should.have.key("policyVersions").which.should.have.length_of(5) "policyVersions"
list( ]
map(lambda item: item["isDefaultVersion"], policy_versions["policyVersions"]) assert len(policy_versions) == 5
).count(True).should.equal(1) assert (
list(map(lambda item: item["isDefaultVersion"], policy_versions)).count(True)
== 1
)
default_policy = list( default_policy = list(
filter(lambda item: item["isDefaultVersion"], policy_versions["policyVersions"]) filter(lambda item: item["isDefaultVersion"], policy_versions)
)
default_policy[0].should.have.key("versionId").should.equal(
policy4["policyVersionId"]
) )
assert default_policy[0]["versionId"] == policy4["policyVersionId"]
policy = iot_client.get_policy(policyName=policy_name) policy = iot_client.get_policy(policyName=policy_name)
policy.should.have.key("policyName").which.should.equal(policy_name) assert policy["policyName"] == policy_name
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
policy.should.have.key("policyDocument").which.should.equal( assert policy["policyDocument"] == json.dumps({"version": "version_4"})
json.dumps({"version": "version_4"}) assert policy["defaultVersionId"] == policy4["policyVersionId"]
)
policy.should.have.key("defaultVersionId").which.should.equal(
policy4["policyVersionId"]
)
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
iot_client.create_policy_version( iot_client.create_policy_version(
@ -211,30 +191,31 @@ def test_policy_versions(iot_client):
setAsDefault=False, setAsDefault=False,
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Message"].should.equal( assert (
f"The policy {policy_name} already has the maximum number of versions (5)" err["Message"]
== f"The policy {policy_name} already has the maximum number of versions (5)"
) )
iot_client.delete_policy_version(policyName=policy_name, policyVersionId="1") iot_client.delete_policy_version(policyName=policy_name, policyVersionId="1")
policy_versions = iot_client.list_policy_versions(policyName=policy_name) policy_versions = iot_client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key("policyVersions").which.should.have.length_of(4) assert len(policy_versions["policyVersions"]) == 4
iot_client.delete_policy_version( iot_client.delete_policy_version(
policyName=policy_name, policyVersionId=policy1["policyVersionId"] policyName=policy_name, policyVersionId=policy1["policyVersionId"]
) )
policy_versions = iot_client.list_policy_versions(policyName=policy_name) policy_versions = iot_client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key("policyVersions").which.should.have.length_of(3) assert len(policy_versions["policyVersions"]) == 3
iot_client.delete_policy_version( iot_client.delete_policy_version(
policyName=policy_name, policyVersionId=policy2["policyVersionId"] policyName=policy_name, policyVersionId=policy2["policyVersionId"]
) )
policy_versions = iot_client.list_policy_versions(policyName=policy_name) policy_versions = iot_client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key("policyVersions").which.should.have.length_of(2) assert len(policy_versions["policyVersions"]) == 2
iot_client.delete_policy_version( iot_client.delete_policy_version(
policyName=policy_name, policyVersionId=policy3["policyVersionId"] policyName=policy_name, policyVersionId=policy3["policyVersionId"]
) )
policy_versions = iot_client.list_policy_versions(policyName=policy_name) policy_versions = iot_client.list_policy_versions(policyName=policy_name)
policy_versions.should.have.key("policyVersions").which.should.have.length_of(1) assert len(policy_versions["policyVersions"]) == 1
# should fail as it"s the default policy. Should use delete_policy instead # should fail as it"s the default policy. Should use delete_policy instead
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
@ -242,7 +223,7 @@ def test_policy_versions(iot_client):
policyName=policy_name, policyVersionId=policy4["policyVersionId"] policyName=policy_name, policyVersionId=policy4["policyVersionId"]
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Message"].should.equal("Cannot delete the default version of a policy") assert err["Message"] == "Cannot delete the default version of a policy"
def test_policy_versions_increment_beyond_5(iot_client, policy): def test_policy_versions_increment_beyond_5(iot_client, policy):
@ -260,7 +241,7 @@ def test_policy_versions_increment_beyond_5(iot_client, policy):
policyDocument=json.dumps({"version": f"version_{v}"}), policyDocument=json.dumps({"version": f"version_{v}"}),
setAsDefault=True, setAsDefault=True,
) )
new_version.should.have.key("policyVersionId").which.should.equal(str(v)) assert new_version["policyVersionId"] == str(v)
iot_client.delete_policy_version( iot_client.delete_policy_version(
policyName=policy_name, policyVersionId=str(v - 1) policyName=policy_name, policyVersionId=str(v - 1)
) )
@ -275,13 +256,13 @@ def test_policy_versions_increment_even_after_version_delete(iot_client, policy)
policyName=policy_name, policyName=policy_name,
policyDocument=json.dumps({"version": "version_2"}), policyDocument=json.dumps({"version": "version_2"}),
) )
new_version.should.have.key("policyVersionId").which.should.equal("2") assert new_version["policyVersionId"] == "2"
iot_client.delete_policy_version(policyName=policy_name, policyVersionId="2") iot_client.delete_policy_version(policyName=policy_name, policyVersionId="2")
third_version = iot_client.create_policy_version( third_version = iot_client.create_policy_version(
policyName=policy_name, policyName=policy_name,
policyDocument=json.dumps({"version": "version_3"}), policyDocument=json.dumps({"version": "version_3"}),
) )
third_version.should.have.key("policyVersionId").which.should.equal("3") assert third_version["policyVersionId"] == "3"
def test_delete_policy_validation(iot_client): def test_delete_policy_validation(iot_client):
@ -306,43 +287,43 @@ def test_delete_policy_validation(iot_client):
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
iot_client.delete_policy(policyName=policy_name) iot_client.delete_policy(policyName=policy_name)
e.value.response["Error"]["Message"].should.contain( assert (
"The policy cannot be deleted as the policy is attached to one or more principals (name=%s)" f"The policy cannot be deleted as the policy is attached to one or more principals (name={policy_name})"
% policy_name in e.value.response["Error"]["Message"]
) )
res = iot_client.list_policies() res = iot_client.list_policies()
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
iot_client.detach_principal_policy(policyName=policy_name, principal=cert_arn) iot_client.detach_principal_policy(policyName=policy_name, principal=cert_arn)
iot_client.delete_policy(policyName=policy_name) iot_client.delete_policy(policyName=policy_name)
res = iot_client.list_policies() res = iot_client.list_policies()
res.should.have.key("policies").which.should.have.length_of(0) assert len(res["policies"]) == 0
def test_policy(iot_client): def test_policy(iot_client):
name = "my-policy" name = "my-policy"
doc = "{}" doc = "{}"
policy = iot_client.create_policy(policyName=name, policyDocument=doc) policy = iot_client.create_policy(policyName=name, policyDocument=doc)
policy.should.have.key("policyName").which.should.equal(name) assert policy["policyName"] == name
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
policy.should.have.key("policyDocument").which.should.equal(doc) assert policy["policyDocument"] == doc
policy.should.have.key("policyVersionId").which.should.equal("1") assert policy["policyVersionId"] == "1"
policy = iot_client.get_policy(policyName=name) policy = iot_client.get_policy(policyName=name)
policy.should.have.key("policyName").which.should.equal(name) assert policy["policyName"] == name
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
policy.should.have.key("policyDocument").which.should.equal(doc) assert policy["policyDocument"] == doc
policy.should.have.key("defaultVersionId").which.should.equal("1") assert policy["defaultVersionId"] == "1"
res = iot_client.list_policies() res = iot_client.list_policies()
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
for policy in res["policies"]: for policy in res["policies"]:
policy.should.have.key("policyName").which.should_not.be.none assert policy["policyName"] is not None
policy.should.have.key("policyArn").which.should_not.be.none assert policy["policyArn"] is not None
iot_client.delete_policy(policyName=name) iot_client.delete_policy(policyName=name)
res = iot_client.list_policies() res = iot_client.list_policies()
res.should.have.key("policies").which.should.have.length_of(0) assert len(res["policies"]) == 0
def test_attach_policy_to_thing_group(iot_client, policy): def test_attach_policy_to_thing_group(iot_client, policy):
@ -353,8 +334,8 @@ def test_attach_policy_to_thing_group(iot_client, policy):
iot_client.attach_policy(policyName=policy_name, target=thing_group_arn) iot_client.attach_policy(policyName=policy_name, target=thing_group_arn)
res = iot_client.list_attached_policies(target=thing_group_arn) res = iot_client.list_attached_policies(target=thing_group_arn)
res.should.have.key("policies").which.should.have.length_of(1) assert len(res["policies"]) == 1
res["policies"][0]["policyName"].should.equal(policy_name) assert res["policies"][0]["policyName"] == policy_name
def test_attach_policy_to_non_existant_thing_group_raises_ResourceNotFoundException( def test_attach_policy_to_non_existant_thing_group_raises_ResourceNotFoundException(
@ -378,14 +359,15 @@ def test_policy_delete_fails_when_versions_exist(iot_client, policy):
) )
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
iot_client.delete_policy(policyName=policy_name) iot_client.delete_policy(policyName=policy_name)
e.value.response["Error"]["Message"].should.contain( assert (
"Cannot delete the policy because it has one or more policy versions attached to it" "Cannot delete the policy because it has one or more policy versions attached to it"
in e.value.response["Error"]["Message"]
) )
def test_list_targets_for_policy_empty(iot_client, policy): def test_list_targets_for_policy_empty(iot_client, policy):
res = iot_client.list_targets_for_policy(policyName=policy["policyName"]) res = iot_client.list_targets_for_policy(policyName=policy["policyName"])
res.should.have.key("targets").which.should.have.length_of(0) assert len(res["targets"]) == 0
def test_list_targets_for_policy_one_attached_thing_group(iot_client, policy): def test_list_targets_for_policy_one_attached_thing_group(iot_client, policy):
@ -396,8 +378,8 @@ def test_list_targets_for_policy_one_attached_thing_group(iot_client, policy):
iot_client.attach_policy(policyName=policy_name, target=thing_group_arn) iot_client.attach_policy(policyName=policy_name, target=thing_group_arn)
res = iot_client.list_targets_for_policy(policyName=policy["policyName"]) res = iot_client.list_targets_for_policy(policyName=policy["policyName"])
res.should.have.key("targets").which.should.have.length_of(1) assert len(res["targets"]) == 1
res["targets"][0].should.equal(thing_group_arn) assert res["targets"][0] == thing_group_arn
def test_list_targets_for_policy_one_attached_certificate(iot_client, policy): def test_list_targets_for_policy_one_attached_certificate(iot_client, policy):
@ -408,16 +390,16 @@ def test_list_targets_for_policy_one_attached_certificate(iot_client, policy):
iot_client.attach_policy(policyName=policy_name, target=cert_arn) iot_client.attach_policy(policyName=policy_name, target=cert_arn)
res = iot_client.list_targets_for_policy(policyName=policy["policyName"]) res = iot_client.list_targets_for_policy(policyName=policy["policyName"])
res.should.have.key("targets").which.should.have.length_of(1) assert len(res["targets"]) == 1
res["targets"][0].should.equal(cert_arn) assert res["targets"][0] == cert_arn
def test_list_targets_for_policy_resource_not_found(iot_client): def test_list_targets_for_policy_resource_not_found(iot_client):
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
iot_client.list_targets_for_policy(policyName="NON_EXISTENT_POLICY_NAME") iot_client.list_targets_for_policy(policyName="NON_EXISTENT_POLICY_NAME")
e.value.response["Error"]["Code"].should.equal("ResourceNotFoundException") assert e.value.response["Error"]["Code"] == "ResourceNotFoundException"
e.value.response["Error"]["Message"].should.contain("Policy not found") assert "Policy not found" in e.value.response["Error"]["Message"]
def test_create_policy_fails_when_name_taken(iot_client, policy): def test_create_policy_fails_when_name_taken(iot_client, policy):
@ -430,12 +412,11 @@ def test_create_policy_fails_when_name_taken(iot_client, policy):
) )
current_policy = iot_client.get_policy(policyName=policy_name) current_policy = iot_client.get_policy(policyName=policy_name)
e.value.response["Error"]["Code"].should.equal("ResourceAlreadyExistsException") assert e.value.response["Error"]["Code"] == "ResourceAlreadyExistsException"
e.value.response["Error"]["Message"].should.equal( assert (
f"Policy cannot be created - name already exists (name={policy_name})" e.value.response["Error"]["Message"]
== f"Policy cannot be created - name already exists (name={policy_name})"
) )
# the policy should not have been overwritten # the policy should not have been overwritten
current_policy.should.have.key("policyDocument").which.should.equal( assert current_policy["policyDocument"] == policy["policyDocument"]
policy["policyDocument"]
)

View File

@ -47,8 +47,8 @@ def test_search_attribute_specific_value(query_string, results):
) )
resp = client.search_index(queryString=query_string) resp = client.search_index(queryString=query_string)
resp.should.have.key("thingGroups").equals([]) assert resp["thingGroups"] == []
resp.should.have.key("things").length_of(len(results)) assert len(resp["things"]) == len(results)
thing_names = [t["thingName"] for t in resp["things"]] thing_names = [t["thingName"] for t in resp["things"]]
set(thing_names).should.equal(results) assert set(thing_names) == results

View File

@ -61,8 +61,8 @@ class TestListThingGroup:
generate_thing_group_tree(client, self.tree_dict) generate_thing_group_tree(client, self.tree_dict)
# test # test
resp = client.list_thing_groups() resp = client.list_thing_groups()
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(8) assert len(resp["thingGroups"]) == 8
@mock_iot @mock_iot
def test_should_list_all_groups_non_recursively(self): def test_should_list_all_groups_non_recursively(self):
@ -71,8 +71,8 @@ class TestListThingGroup:
generate_thing_group_tree(client, self.tree_dict) generate_thing_group_tree(client, self.tree_dict)
# test # test
resp = client.list_thing_groups(recursive=False) resp = client.list_thing_groups(recursive=False)
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(2) assert len(resp["thingGroups"]) == 2
@mock_iot @mock_iot
def test_should_list_all_groups_filtered_by_parent(self): def test_should_list_all_groups_filtered_by_parent(self):
@ -81,17 +81,17 @@ class TestListThingGroup:
generate_thing_group_tree(client, self.tree_dict) generate_thing_group_tree(client, self.tree_dict)
# test # test
resp = client.list_thing_groups(parentGroup=self.group_name_1a) resp = client.list_thing_groups(parentGroup=self.group_name_1a)
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(6) assert len(resp["thingGroups"]) == 6
resp = client.list_thing_groups(parentGroup=self.group_name_2a) resp = client.list_thing_groups(parentGroup=self.group_name_2a)
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(2) assert len(resp["thingGroups"]) == 2
resp = client.list_thing_groups(parentGroup=self.group_name_1b) resp = client.list_thing_groups(parentGroup=self.group_name_1b)
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(0) assert len(resp["thingGroups"]) == 0
with pytest.raises(ClientError) as e: with pytest.raises(ClientError) as e:
client.list_thing_groups(parentGroup="inexistant-group-name") client.list_thing_groups(parentGroup="inexistant-group-name")
e.value.response["Error"]["Code"].should.equal("ResourceNotFoundException") assert e.value.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_iot @mock_iot
def test_should_list_all_groups_filtered_by_parent_non_recursively(self): def test_should_list_all_groups_filtered_by_parent_non_recursively(self):
@ -100,11 +100,11 @@ class TestListThingGroup:
generate_thing_group_tree(client, self.tree_dict) generate_thing_group_tree(client, self.tree_dict)
# test # test
resp = client.list_thing_groups(parentGroup=self.group_name_1a, recursive=False) resp = client.list_thing_groups(parentGroup=self.group_name_1a, recursive=False)
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(2) assert len(resp["thingGroups"]) == 2
resp = client.list_thing_groups(parentGroup=self.group_name_2a, recursive=False) resp = client.list_thing_groups(parentGroup=self.group_name_2a, recursive=False)
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(2) assert len(resp["thingGroups"]) == 2
@mock_iot @mock_iot
def test_should_list_all_groups_filtered_by_name_prefix(self): def test_should_list_all_groups_filtered_by_name_prefix(self):
@ -113,14 +113,14 @@ class TestListThingGroup:
generate_thing_group_tree(client, self.tree_dict) generate_thing_group_tree(client, self.tree_dict)
# test # test
resp = client.list_thing_groups(namePrefixFilter="my-group-name-1") resp = client.list_thing_groups(namePrefixFilter="my-group-name-1")
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(2) assert len(resp["thingGroups"]) == 2
resp = client.list_thing_groups(namePrefixFilter="my-group-name-3") resp = client.list_thing_groups(namePrefixFilter="my-group-name-3")
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(4) assert len(resp["thingGroups"]) == 4
resp = client.list_thing_groups(namePrefixFilter="prefix-which-doesn-not-match") resp = client.list_thing_groups(namePrefixFilter="prefix-which-doesn-not-match")
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(0) assert len(resp["thingGroups"]) == 0
@mock_iot @mock_iot
def test_should_list_all_groups_filtered_by_name_prefix_non_recursively(self): def test_should_list_all_groups_filtered_by_name_prefix_non_recursively(self):
@ -131,13 +131,13 @@ class TestListThingGroup:
resp = client.list_thing_groups( resp = client.list_thing_groups(
namePrefixFilter="my-group-name-1", recursive=False namePrefixFilter="my-group-name-1", recursive=False
) )
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(2) assert len(resp["thingGroups"]) == 2
resp = client.list_thing_groups( resp = client.list_thing_groups(
namePrefixFilter="my-group-name-3", recursive=False namePrefixFilter="my-group-name-3", recursive=False
) )
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(0) assert len(resp["thingGroups"]) == 0
@mock_iot @mock_iot
def test_should_list_all_groups_filtered_by_name_prefix_and_parent(self): def test_should_list_all_groups_filtered_by_name_prefix_and_parent(self):
@ -148,19 +148,19 @@ class TestListThingGroup:
resp = client.list_thing_groups( resp = client.list_thing_groups(
namePrefixFilter="my-group-name-2", parentGroup=self.group_name_1a namePrefixFilter="my-group-name-2", parentGroup=self.group_name_1a
) )
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(2) assert len(resp["thingGroups"]) == 2
resp = client.list_thing_groups( resp = client.list_thing_groups(
namePrefixFilter="my-group-name-3", parentGroup=self.group_name_1a namePrefixFilter="my-group-name-3", parentGroup=self.group_name_1a
) )
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(4) assert len(resp["thingGroups"]) == 4
resp = client.list_thing_groups( resp = client.list_thing_groups(
namePrefixFilter="prefix-which-doesn-not-match", namePrefixFilter="prefix-which-doesn-not-match",
parentGroup=self.group_name_1a, parentGroup=self.group_name_1a,
) )
resp.should.have.key("thingGroups") assert "thingGroups" in resp
resp["thingGroups"].should.have.length_of(0) assert len(resp["thingGroups"]) == 0
@mock_iot @mock_iot
@ -176,24 +176,24 @@ def test_delete_thing_group():
client.delete_thing_group(thingGroupName=group_name_1a) client.delete_thing_group(thingGroupName=group_name_1a)
except client.exceptions.InvalidRequestException as exc: except client.exceptions.InvalidRequestException as exc:
error_code = exc.response["Error"]["Code"] error_code = exc.response["Error"]["Code"]
error_code.should.equal("InvalidRequestException") assert error_code == "InvalidRequestException"
else: else:
raise Exception("Should have raised error") raise Exception("Should have raised error")
# delete child group # delete child group
client.delete_thing_group(thingGroupName=group_name_2a) client.delete_thing_group(thingGroupName=group_name_2a)
res = client.list_thing_groups() res = client.list_thing_groups()
res.should.have.key("thingGroups").which.should.have.length_of(1) assert len(res["thingGroups"]) == 1
res["thingGroups"].should_not.have.key(group_name_2a) assert group_name_2a not in res["thingGroups"]
# now that there is no child group, we can delete the previous group safely # now that there is no child group, we can delete the previous group safely
client.delete_thing_group(thingGroupName=group_name_1a) client.delete_thing_group(thingGroupName=group_name_1a)
res = client.list_thing_groups() res = client.list_thing_groups()
res.should.have.key("thingGroups").which.should.have.length_of(0) assert len(res["thingGroups"]) == 0
# Deleting an invalid thing group does not raise an error. # Deleting an invalid thing group does not raise an error.
res = client.delete_thing_group(thingGroupName="non-existent-group-name") res = client.delete_thing_group(thingGroupName="non-existent-group-name")
res["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) assert res["ResponseMetadata"]["HTTPStatusCode"] == 200
@mock_iot @mock_iot
@ -220,210 +220,94 @@ def test_describe_thing_group_metadata_hierarchy():
# describe groups # describe groups
# groups level 1 # groups level 1
# 1a # 1a
thing_group_description1a = client.describe_thing_group( desc1a = client.describe_thing_group(thingGroupName=group_name_1a)
thingGroupName=group_name_1a assert desc1a["thingGroupName"] == group_name_1a
) assert "thingGroupProperties" in desc1a
thing_group_description1a.should.have.key("thingGroupName").which.should.equal( assert "creationDate" in desc1a["thingGroupMetadata"]
group_name_1a assert "version" in desc1a
)
thing_group_description1a.should.have.key("thingGroupProperties")
thing_group_description1a.should.have.key("thingGroupMetadata")
thing_group_description1a["thingGroupMetadata"].should.have.key("creationDate")
thing_group_description1a.should.have.key("version")
# 1b # 1b
thing_group_description1b = client.describe_thing_group( desc1b = client.describe_thing_group(thingGroupName=group_name_1b)
thingGroupName=group_name_1b assert desc1b["thingGroupName"] == group_name_1b
) assert "thingGroupProperties" in desc1b
thing_group_description1b.should.have.key("thingGroupName").which.should.equal( assert len(desc1b["thingGroupMetadata"]) == 1
group_name_1b assert "creationDate" in desc1b["thingGroupMetadata"]
) assert "version" in desc1b
thing_group_description1b.should.have.key("thingGroupProperties")
thing_group_description1b.should.have.key("thingGroupMetadata")
thing_group_description1b["thingGroupMetadata"].should.have.length_of(1)
thing_group_description1b["thingGroupMetadata"].should.have.key("creationDate")
thing_group_description1b.should.have.key("version")
# groups level 2 # groups level 2
# 2a # 2a
thing_group_description2a = client.describe_thing_group( desc2a = client.describe_thing_group(thingGroupName=group_name_2a)
thingGroupName=group_name_2a assert desc2a["thingGroupName"] == group_name_2a
) assert "thingGroupProperties" in desc2a
thing_group_description2a.should.have.key("thingGroupName").which.should.equal( assert len(desc2a["thingGroupMetadata"]) == 3
group_name_2a assert desc2a["thingGroupMetadata"]["parentGroupName"] == group_name_1a
) desc2a_groups = desc2a["thingGroupMetadata"]["rootToParentThingGroups"]
thing_group_description2a.should.have.key("thingGroupProperties") assert len(desc2a_groups) == 1
thing_group_description2a.should.have.key("thingGroupMetadata") assert desc2a_groups[0]["groupName"] == group_name_1a
thing_group_description2a["thingGroupMetadata"].should.have.length_of(3) assert desc2a_groups[0]["groupArn"] == group_catalog[group_name_1a]["thingGroupArn"]
thing_group_description2a["thingGroupMetadata"].should.have.key( assert "version" in desc2a
"parentGroupName"
).being.equal(group_name_1a)
thing_group_description2a["thingGroupMetadata"].should.have.key(
"rootToParentThingGroups"
)
thing_group_description2a["thingGroupMetadata"][
"rootToParentThingGroups"
].should.have.length_of(1)
thing_group_description2a["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupName"
].should.match(group_name_1a)
thing_group_description2a["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupArn"
].should.match(group_catalog[group_name_1a]["thingGroupArn"])
thing_group_description2a.should.have.key("version")
# 2b # 2b
thing_group_description2b = client.describe_thing_group( desc2b = client.describe_thing_group(thingGroupName=group_name_2b)
thingGroupName=group_name_2b assert desc2b["thingGroupName"] == group_name_2b
) assert "thingGroupProperties" in desc2b
thing_group_description2b.should.have.key("thingGroupName").which.should.equal( assert len(desc2b["thingGroupMetadata"]) == 3
group_name_2b assert desc2b["thingGroupMetadata"]["parentGroupName"] == group_name_1a
) desc2b_groups = desc2b["thingGroupMetadata"]["rootToParentThingGroups"]
thing_group_description2b.should.have.key("thingGroupProperties") assert len(desc2b_groups) == 1
thing_group_description2b.should.have.key("thingGroupMetadata") assert desc2b_groups[0]["groupName"] == group_name_1a
thing_group_description2b["thingGroupMetadata"].should.have.length_of(3) assert desc2b_groups[0]["groupArn"] == group_catalog[group_name_1a]["thingGroupArn"]
thing_group_description2b["thingGroupMetadata"].should.have.key( assert "version" in desc2b
"parentGroupName"
).being.equal(group_name_1a)
thing_group_description2b["thingGroupMetadata"].should.have.key(
"rootToParentThingGroups"
)
thing_group_description2b["thingGroupMetadata"][
"rootToParentThingGroups"
].should.have.length_of(1)
thing_group_description2b["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupName"
].should.match(group_name_1a)
thing_group_description2b["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupArn"
].should.match(group_catalog[group_name_1a]["thingGroupArn"])
thing_group_description2b.should.have.key("version")
# groups level 3 # groups level 3
# 3a # 3a
thing_group_description3a = client.describe_thing_group( desc3a = client.describe_thing_group(thingGroupName=group_name_3a)
thingGroupName=group_name_3a assert desc3a["thingGroupName"] == group_name_3a
) assert "thingGroupProperties" in desc3a
thing_group_description3a.should.have.key("thingGroupName").which.should.equal( assert len(desc3a["thingGroupMetadata"]) == 3
group_name_3a assert desc3a["thingGroupMetadata"]["parentGroupName"] == group_name_2a
) desc3a_groups = desc3a["thingGroupMetadata"]["rootToParentThingGroups"]
thing_group_description3a.should.have.key("thingGroupProperties") assert len(desc3a_groups) == 2
thing_group_description3a.should.have.key("thingGroupMetadata") assert desc3a_groups[0]["groupName"] == group_name_1a
thing_group_description3a["thingGroupMetadata"].should.have.length_of(3) assert desc3a_groups[0]["groupArn"] == group_catalog[group_name_1a]["thingGroupArn"]
thing_group_description3a["thingGroupMetadata"].should.have.key( assert desc3a_groups[1]["groupName"] == group_name_2a
"parentGroupName" assert desc3a_groups[1]["groupArn"] == group_catalog[group_name_2a]["thingGroupArn"]
).being.equal(group_name_2a) assert "version" in desc3a
thing_group_description3a["thingGroupMetadata"].should.have.key(
"rootToParentThingGroups"
)
thing_group_description3a["thingGroupMetadata"][
"rootToParentThingGroups"
].should.have.length_of(2)
thing_group_description3a["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupName"
].should.match(group_name_1a)
thing_group_description3a["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupArn"
].should.match(group_catalog[group_name_1a]["thingGroupArn"])
thing_group_description3a["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupName"
].should.match(group_name_2a)
thing_group_description3a["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupArn"
].should.match(group_catalog[group_name_2a]["thingGroupArn"])
thing_group_description3a.should.have.key("version")
# 3b # 3b
thing_group_description3b = client.describe_thing_group( desc3b = client.describe_thing_group(thingGroupName=group_name_3b)
thingGroupName=group_name_3b assert desc3b["thingGroupName"] == group_name_3b
) assert "thingGroupProperties" in desc3b
thing_group_description3b.should.have.key("thingGroupName").which.should.equal( assert len(desc3b["thingGroupMetadata"]) == 3
group_name_3b assert desc3b["thingGroupMetadata"]["parentGroupName"] == group_name_2a
) desc3b_groups = desc3b["thingGroupMetadata"]["rootToParentThingGroups"]
thing_group_description3b.should.have.key("thingGroupProperties") assert len(desc3b_groups) == 2
thing_group_description3b.should.have.key("thingGroupMetadata") assert desc3b_groups[0]["groupName"] == group_name_1a
thing_group_description3b["thingGroupMetadata"].should.have.length_of(3) assert desc3b_groups[0]["groupArn"] == group_catalog[group_name_1a]["thingGroupArn"]
thing_group_description3b["thingGroupMetadata"].should.have.key( assert desc3b_groups[1]["groupName"] == group_name_2a
"parentGroupName" assert desc3b_groups[1]["groupArn"] == group_catalog[group_name_2a]["thingGroupArn"]
).being.equal(group_name_2a) assert "version" in desc3b
thing_group_description3b["thingGroupMetadata"].should.have.key(
"rootToParentThingGroups"
)
thing_group_description3b["thingGroupMetadata"][
"rootToParentThingGroups"
].should.have.length_of(2)
thing_group_description3b["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupName"
].should.match(group_name_1a)
thing_group_description3b["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupArn"
].should.match(group_catalog[group_name_1a]["thingGroupArn"])
thing_group_description3b["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupName"
].should.match(group_name_2a)
thing_group_description3b["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupArn"
].should.match(group_catalog[group_name_2a]["thingGroupArn"])
thing_group_description3b.should.have.key("version")
# 3c # 3c
thing_group_description3c = client.describe_thing_group( desc3c = client.describe_thing_group(thingGroupName=group_name_3c)
thingGroupName=group_name_3c assert desc3c["thingGroupName"] == group_name_3c
) assert "thingGroupProperties" in desc3c
thing_group_description3c.should.have.key("thingGroupName").which.should.equal( assert len(desc3c["thingGroupMetadata"]) == 3
group_name_3c assert desc3c["thingGroupMetadata"]["parentGroupName"] == group_name_2b
) desc3c_groups = desc3c["thingGroupMetadata"]["rootToParentThingGroups"]
thing_group_description3c.should.have.key("thingGroupProperties") assert len(desc3c_groups) == 2
thing_group_description3c.should.have.key("thingGroupMetadata") assert desc3c_groups[0]["groupName"] == group_name_1a
thing_group_description3c["thingGroupMetadata"].should.have.length_of(3) assert desc3c_groups[0]["groupArn"] == group_catalog[group_name_1a]["thingGroupArn"]
thing_group_description3c["thingGroupMetadata"].should.have.key( assert desc3c_groups[1]["groupName"] == group_name_2b
"parentGroupName" assert desc3c_groups[1]["groupArn"] == group_catalog[group_name_2b]["thingGroupArn"]
).being.equal(group_name_2b) assert "version" in desc3c
thing_group_description3c["thingGroupMetadata"].should.have.key(
"rootToParentThingGroups"
)
thing_group_description3c["thingGroupMetadata"][
"rootToParentThingGroups"
].should.have.length_of(2)
thing_group_description3c["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupName"
].should.match(group_name_1a)
thing_group_description3c["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupArn"
].should.match(group_catalog[group_name_1a]["thingGroupArn"])
thing_group_description3c["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupName"
].should.match(group_name_2b)
thing_group_description3c["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupArn"
].should.match(group_catalog[group_name_2b]["thingGroupArn"])
thing_group_description3c.should.have.key("version")
# 3d # 3d
thing_group_description3d = client.describe_thing_group( desc3d = client.describe_thing_group(thingGroupName=group_name_3d)
thingGroupName=group_name_3d assert desc3d["thingGroupName"] == group_name_3d
) assert "thingGroupProperties" in desc3d
thing_group_description3d.should.have.key("thingGroupName").which.should.equal( assert len(desc3d["thingGroupMetadata"]) == 3
group_name_3d assert desc3d["thingGroupMetadata"]["parentGroupName"] == group_name_2b
) desc3d_groups = desc3d["thingGroupMetadata"]["rootToParentThingGroups"]
thing_group_description3d.should.have.key("thingGroupProperties") assert len(desc3d_groups) == 2
thing_group_description3d.should.have.key("thingGroupMetadata") assert desc3d_groups[0]["groupName"] == group_name_1a
thing_group_description3d["thingGroupMetadata"].should.have.length_of(3) assert desc3d_groups[0]["groupArn"] == group_catalog[group_name_1a]["thingGroupArn"]
thing_group_description3d["thingGroupMetadata"].should.have.key( assert desc3d_groups[1]["groupName"] == group_name_2b
"parentGroupName" assert desc3d_groups[1]["groupArn"] == group_catalog[group_name_2b]["thingGroupArn"]
).being.equal(group_name_2b) assert "version" in desc3d
thing_group_description3d["thingGroupMetadata"].should.have.key(
"rootToParentThingGroups"
)
thing_group_description3d["thingGroupMetadata"][
"rootToParentThingGroups"
].should.have.length_of(2)
thing_group_description3d["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupName"
].should.match(group_name_1a)
thing_group_description3d["thingGroupMetadata"]["rootToParentThingGroups"][0][
"groupArn"
].should.match(group_catalog[group_name_1a]["thingGroupArn"])
thing_group_description3d["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupName"
].should.match(group_name_2b)
thing_group_description3d["thingGroupMetadata"]["rootToParentThingGroups"][1][
"groupArn"
].should.match(group_catalog[group_name_2b]["thingGroupArn"])
thing_group_description3d.should.have.key("version")
@mock_iot @mock_iot
@ -433,28 +317,28 @@ def test_thing_groups():
# thing group # thing group
thing_group = client.create_thing_group(thingGroupName=group_name) thing_group = client.create_thing_group(thingGroupName=group_name)
thing_group.should.have.key("thingGroupName").which.should.equal(group_name) assert thing_group["thingGroupName"] == group_name
thing_group.should.have.key("thingGroupArn") assert "thingGroupArn" in thing_group
thing_group["thingGroupArn"].should.contain(group_name) assert group_name in thing_group["thingGroupArn"]
res = client.list_thing_groups() res = client.list_thing_groups()
res.should.have.key("thingGroups").which.should.have.length_of(1) assert len(res["thingGroups"]) == 1
for thing_group in res["thingGroups"]: for thing_group in res["thingGroups"]:
thing_group.should.have.key("groupName").which.should_not.be.none assert thing_group["groupName"] is not None
thing_group.should.have.key("groupArn").which.should_not.be.none assert thing_group["groupArn"] is not None
thing_group = client.describe_thing_group(thingGroupName=group_name) thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key("thingGroupName").which.should.equal(group_name) assert thing_group["thingGroupName"] == group_name
thing_group.should.have.key("thingGroupProperties") assert "thingGroupProperties" in thing_group
thing_group.should.have.key("thingGroupMetadata") assert "thingGroupMetadata" in thing_group
thing_group.should.have.key("version") assert "version" in thing_group
thing_group.should.have.key("thingGroupArn") assert "thingGroupArn" in thing_group
thing_group["thingGroupArn"].should.contain(group_name) assert group_name in thing_group["thingGroupArn"]
# delete thing group # delete thing group
client.delete_thing_group(thingGroupName=group_name) client.delete_thing_group(thingGroupName=group_name)
res = client.list_thing_groups() res = client.list_thing_groups()
res.should.have.key("thingGroups").which.should.have.length_of(0) assert len(res["thingGroups"]) == 0
# props create test # props create test
props = { props = {
@ -464,40 +348,34 @@ def test_thing_groups():
thing_group = client.create_thing_group( thing_group = client.create_thing_group(
thingGroupName=group_name, thingGroupProperties=props thingGroupName=group_name, thingGroupProperties=props
) )
thing_group.should.have.key("thingGroupName").which.should.equal(group_name) assert thing_group["thingGroupName"] == group_name
thing_group.should.have.key("thingGroupArn") assert "thingGroupArn" in thing_group
thing_group = client.describe_thing_group(thingGroupName=group_name) thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key("thingGroupProperties").which.should.have.key( assert "attributes" in thing_group["thingGroupProperties"]["attributePayload"]
"attributePayload"
).which.should.have.key("attributes")
res_props = thing_group["thingGroupProperties"]["attributePayload"]["attributes"] res_props = thing_group["thingGroupProperties"]["attributePayload"]["attributes"]
res_props.should.have.key("key1").which.should.equal("val01") assert res_props["key1"] == "val01"
res_props.should.have.key("Key02").which.should.equal("VAL2") assert res_props["Key02"] == "VAL2"
# props update test with merge # props update test with merge
new_props = {"attributePayload": {"attributes": {"k3": "v3"}, "merge": True}} new_props = {"attributePayload": {"attributes": {"k3": "v3"}, "merge": True}}
client.update_thing_group(thingGroupName=group_name, thingGroupProperties=new_props) client.update_thing_group(thingGroupName=group_name, thingGroupProperties=new_props)
thing_group = client.describe_thing_group(thingGroupName=group_name) thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key("thingGroupProperties").which.should.have.key( assert "attributes" in thing_group["thingGroupProperties"]["attributePayload"]
"attributePayload"
).which.should.have.key("attributes")
res_props = thing_group["thingGroupProperties"]["attributePayload"]["attributes"] res_props = thing_group["thingGroupProperties"]["attributePayload"]["attributes"]
res_props.should.have.key("key1").which.should.equal("val01") assert res_props["key1"] == "val01"
res_props.should.have.key("Key02").which.should.equal("VAL2") assert res_props["Key02"] == "VAL2"
res_props.should.have.key("k3").which.should.equal("v3") assert res_props["k3"] == "v3"
# props update test # props update test
new_props = {"attributePayload": {"attributes": {"k4": "v4"}}} new_props = {"attributePayload": {"attributes": {"k4": "v4"}}}
client.update_thing_group(thingGroupName=group_name, thingGroupProperties=new_props) client.update_thing_group(thingGroupName=group_name, thingGroupProperties=new_props)
thing_group = client.describe_thing_group(thingGroupName=group_name) thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key("thingGroupProperties").which.should.have.key( assert "attributes" in thing_group["thingGroupProperties"]["attributePayload"]
"attributePayload"
).which.should.have.key("attributes")
res_props = thing_group["thingGroupProperties"]["attributePayload"]["attributes"] res_props = thing_group["thingGroupProperties"]["attributePayload"]["attributes"]
res_props.should.have.key("k4").which.should.equal("v4") assert res_props["k4"] == "v4"
res_props.should_not.have.key("key1") assert "key1" not in res_props
@mock_iot @mock_iot
@ -508,13 +386,13 @@ def test_thing_group_relations():
# thing group # thing group
thing_group = client.create_thing_group(thingGroupName=group_name) thing_group = client.create_thing_group(thingGroupName=group_name)
thing_group.should.have.key("thingGroupName").which.should.equal(group_name) assert thing_group["thingGroupName"] == group_name
thing_group.should.have.key("thingGroupArn") assert "thingGroupArn" in thing_group
# thing # thing
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
# add in 4 way # add in 4 way
client.add_thing_to_thing_group(thingGroupName=group_name, thingName=name) client.add_thing_to_thing_group(thingGroupName=group_name, thingName=name)
@ -529,12 +407,12 @@ def test_thing_group_relations():
) )
things = client.list_things_in_thing_group(thingGroupName=group_name) things = client.list_things_in_thing_group(thingGroupName=group_name)
things.should.have.key("things") assert "things" in things
things["things"].should.have.length_of(1) assert len(things["things"]) == 1
thing_groups = client.list_thing_groups_for_thing(thingName=name) thing_groups = client.list_thing_groups_for_thing(thingName=name)
thing_groups.should.have.key("thingGroups") assert "thingGroups" in thing_groups
thing_groups["thingGroups"].should.have.length_of(1) assert len(thing_groups["thingGroups"]) == 1
# remove in 4 way # remove in 4 way
client.remove_thing_from_thing_group(thingGroupName=group_name, thingName=name) client.remove_thing_from_thing_group(thingGroupName=group_name, thingName=name)
@ -548,21 +426,21 @@ def test_thing_group_relations():
thingGroupArn=thing_group["thingGroupArn"], thingName=name thingGroupArn=thing_group["thingGroupArn"], thingName=name
) )
things = client.list_things_in_thing_group(thingGroupName=group_name) things = client.list_things_in_thing_group(thingGroupName=group_name)
things.should.have.key("things") assert "things" in things
things["things"].should.have.length_of(0) assert len(things["things"]) == 0
# update thing group for thing # update thing group for thing
client.update_thing_groups_for_thing(thingName=name, thingGroupsToAdd=[group_name]) client.update_thing_groups_for_thing(thingName=name, thingGroupsToAdd=[group_name])
things = client.list_things_in_thing_group(thingGroupName=group_name) things = client.list_things_in_thing_group(thingGroupName=group_name)
things.should.have.key("things") assert "things" in things
things["things"].should.have.length_of(1) assert len(things["things"]) == 1
client.update_thing_groups_for_thing( client.update_thing_groups_for_thing(
thingName=name, thingGroupsToRemove=[group_name] thingName=name, thingGroupsToRemove=[group_name]
) )
things = client.list_things_in_thing_group(thingGroupName=group_name) things = client.list_things_in_thing_group(thingGroupName=group_name)
things.should.have.key("things") assert "things" in things
things["things"].should.have.length_of(0) assert len(things["things"]) == 0
@mock_iot @mock_iot
@ -614,9 +492,9 @@ def test_thing_group_updates_description():
) )
thing_group = client.describe_thing_group(thingGroupName=name) thing_group = client.describe_thing_group(thingGroupName=name)
thing_group.should.have.key("thingGroupProperties").which.should.have.key( assert (
"thingGroupDescription" thing_group["thingGroupProperties"]["thingGroupDescription"] == new_description
).which.should.equal(new_description) )
@mock_iot @mock_iot
@ -638,9 +516,9 @@ def test_thing_group_update_with_no_previous_attributes_no_merge():
) )
updated_thing_group = client.describe_thing_group(thingGroupName=name) updated_thing_group = client.describe_thing_group(thingGroupName=name)
updated_thing_group.should.have.key("thingGroupProperties").which.should.have.key( assert updated_thing_group["thingGroupProperties"]["attributePayload"][
"attributePayload" "attributes"
).which.should.have.key("attributes").which.should.equal({"key1": "val01"}) ] == {"key1": "val01"}
@mock_iot @mock_iot
@ -662,9 +540,9 @@ def test_thing_group_update_with_no_previous_attributes_with_merge():
) )
updated_thing_group = client.describe_thing_group(thingGroupName=name) updated_thing_group = client.describe_thing_group(thingGroupName=name)
updated_thing_group.should.have.key("thingGroupProperties").which.should.have.key( assert updated_thing_group["thingGroupProperties"]["attributePayload"][
"attributePayload" "attributes"
).which.should.have.key("attributes").which.should.equal({"key1": "val01"}) ] == {"key1": "val01"}
@mock_iot @mock_iot
@ -684,6 +562,4 @@ def test_thing_group_updated_with_empty_attributes_no_merge_no_attributes_added(
) )
updated_thing_group = client.describe_thing_group(thingGroupName=name) updated_thing_group = client.describe_thing_group(thingGroupName=name)
updated_thing_group.should.have.key( assert "attributePayload" not in updated_thing_group["thingGroupProperties"]
"thingGroupProperties"
).which.should_not.have.key("attributePayload")

View File

@ -9,9 +9,8 @@ def test_create_thing_type():
type_name = "my-type-name" type_name = "my-type-name"
thing_type = client.create_thing_type(thingTypeName=type_name) thing_type = client.create_thing_type(thingTypeName=type_name)
thing_type.should.have.key("thingTypeName").which.should.equal(type_name) assert thing_type["thingTypeName"] == type_name
thing_type.should.have.key("thingTypeArn") assert type_name in thing_type["thingTypeArn"]
thing_type["thingTypeArn"].should.contain(type_name)
@mock_iot @mock_iot
@ -22,11 +21,10 @@ def test_describe_thing_type():
client.create_thing_type(thingTypeName=type_name) client.create_thing_type(thingTypeName=type_name)
thing_type = client.describe_thing_type(thingTypeName=type_name) thing_type = client.describe_thing_type(thingTypeName=type_name)
thing_type.should.have.key("thingTypeName").which.should.equal(type_name) assert thing_type["thingTypeName"] == type_name
thing_type.should.have.key("thingTypeProperties") assert "thingTypeProperties" in thing_type
thing_type.should.have.key("thingTypeMetadata") assert "thingTypeMetadata" in thing_type
thing_type.should.have.key("thingTypeArn") assert type_name in thing_type["thingTypeArn"]
thing_type["thingTypeArn"].should.contain(type_name)
@mock_iot @mock_iot
@ -37,16 +35,16 @@ def test_list_thing_types():
client.create_thing_type(thingTypeName=str(i + 1)) client.create_thing_type(thingTypeName=str(i + 1))
thing_types = client.list_thing_types() thing_types = client.list_thing_types()
thing_types.should.have.key("nextToken") assert "nextToken" in thing_types
thing_types.should.have.key("thingTypes").which.should.have.length_of(50) assert len(thing_types["thingTypes"]) == 50
thing_types["thingTypes"][0]["thingTypeName"].should.equal("1") assert thing_types["thingTypes"][0]["thingTypeName"] == "1"
thing_types["thingTypes"][-1]["thingTypeName"].should.equal("50") assert thing_types["thingTypes"][-1]["thingTypeName"] == "50"
thing_types = client.list_thing_types(nextToken=thing_types["nextToken"]) thing_types = client.list_thing_types(nextToken=thing_types["nextToken"])
thing_types.should.have.key("thingTypes").which.should.have.length_of(50) assert len(thing_types["thingTypes"]) == 50
thing_types.should_not.have.key("nextToken") assert "nextToken" not in thing_types
thing_types["thingTypes"][0]["thingTypeName"].should.equal("51") assert thing_types["thingTypes"][0]["thingTypeName"] == "51"
thing_types["thingTypes"][-1]["thingTypeName"].should.equal("100") assert thing_types["thingTypes"][-1]["thingTypeName"] == "100"
@mock_iot @mock_iot
@ -61,16 +59,16 @@ def test_list_thing_types_with_typename_filter():
client.create_thing_type(thingTypeName="find me it shall not") client.create_thing_type(thingTypeName="find me it shall not")
thing_types = client.list_thing_types(thingTypeName="thing") thing_types = client.list_thing_types(thingTypeName="thing")
thing_types.should_not.have.key("nextToken") assert "nextToken" not in thing_types
thing_types.should.have.key("thingTypes").which.should.have.length_of(4) assert len(thing_types["thingTypes"]) == 4
thing_types["thingTypes"][0]["thingTypeName"].should.equal("thing") assert thing_types["thingTypes"][0]["thingTypeName"] == "thing"
thing_types["thingTypes"][-1]["thingTypeName"].should.equal("thingTypeNameGroup") assert thing_types["thingTypes"][-1]["thingTypeName"] == "thingTypeNameGroup"
thing_types = client.list_thing_types(thingTypeName="thingTypeName") thing_types = client.list_thing_types(thingTypeName="thingTypeName")
thing_types.should_not.have.key("nextToken") assert "nextToken" not in thing_types
thing_types.should.have.key("thingTypes").which.should.have.length_of(2) assert len(thing_types["thingTypes"]) == 2
thing_types["thingTypes"][0]["thingTypeName"].should.equal("thingTypeName") assert thing_types["thingTypes"][0]["thingTypeName"] == "thingTypeName"
thing_types["thingTypes"][-1]["thingTypeName"].should.equal("thingTypeNameGroup") assert thing_types["thingTypes"][-1]["thingTypeName"] == "thingTypeNameGroup"
@mock_iot @mock_iot
@ -83,4 +81,4 @@ def test_delete_thing_type():
# delete thing type # delete thing type
client.delete_thing_type(thingTypeName=type_name) client.delete_thing_type(thingTypeName=type_name)
res = client.list_thing_types() res = client.list_thing_types()
res.should.have.key("thingTypes").which.should.have.length_of(0) assert len(res["thingTypes"]) == 0

View File

@ -10,13 +10,13 @@ def test_create_thing():
name = "my-thing" name = "my-thing"
thing = client.create_thing(thingName=name) thing = client.create_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
res = client.list_things() res = client.list_things()
res.should.have.key("things").which.should.have.length_of(1) assert len(res["things"]) == 1
res["things"][0].should.have.key("thingName").which.should_not.be.none assert res["things"][0]["thingName"] is not None
res["things"][0].should.have.key("thingArn").which.should_not.be.none assert res["things"][0]["thingArn"] is not None
@mock_iot @mock_iot
@ -28,16 +28,16 @@ def test_create_thing_with_type():
client.create_thing_type(thingTypeName=type_name) client.create_thing_type(thingTypeName=type_name)
thing = client.create_thing(thingName=name, thingTypeName=type_name) thing = client.create_thing(thingName=name, thingTypeName=type_name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("thingArn") assert "thingArn" in thing
res = client.list_things() res = client.list_things()
res.should.have.key("things").which.should.have.length_of(1) assert len(res["things"]) == 1
res["things"][0].should.have.key("thingName").which.should_not.be.none assert res["things"][0]["thingName"] is not None
res["things"][0].should.have.key("thingArn").which.should_not.be.none assert res["things"][0]["thingArn"] is not None
thing = client.describe_thing(thingName=name) thing = client.describe_thing(thingName=name)
thing.should.have.key("thingTypeName").equals(type_name) assert thing["thingTypeName"] == type_name
@mock_iot @mock_iot
@ -49,10 +49,10 @@ def test_update_thing():
client.update_thing(thingName=name, attributePayload={"attributes": {"k1": "v1"}}) client.update_thing(thingName=name, attributePayload={"attributes": {"k1": "v1"}})
res = client.list_things() res = client.list_things()
res.should.have.key("things").which.should.have.length_of(1) assert len(res["things"]) == 1
res["things"][0].should.have.key("thingName").which.should_not.be.none assert res["things"][0]["thingName"] is not None
res["things"][0].should.have.key("thingArn").which.should_not.be.none assert res["things"][0]["thingArn"] is not None
res["things"][0]["attributes"].should.have.key("k1").which.should.equal("v1") assert res["things"][0]["attributes"]["k1"] == "v1"
@mock_iot @mock_iot
@ -64,10 +64,10 @@ def test_describe_thing():
client.update_thing(thingName=name, attributePayload={"attributes": {"k1": "v1"}}) client.update_thing(thingName=name, attributePayload={"attributes": {"k1": "v1"}})
thing = client.describe_thing(thingName=name) thing = client.describe_thing(thingName=name)
thing.should.have.key("thingName").which.should.equal(name) assert thing["thingName"] == name
thing.should.have.key("defaultClientId") assert "defaultClientId" in thing
thing.should.have.key("attributes").equals({"k1": "v1"}) assert thing["attributes"] == {"k1": "v1"}
thing.should.have.key("version").equals(1) assert thing["version"] == 1
@mock_iot @mock_iot
@ -81,7 +81,7 @@ def test_delete_thing():
client.delete_thing(thingName=name) client.delete_thing(thingName=name)
res = client.list_things() res = client.list_things()
res.should.have.key("things").which.should.have.length_of(0) assert len(res["things"]) == 0
@mock_iot @mock_iot
@ -92,51 +92,56 @@ def test_list_things_with_next_token():
client.create_thing(thingName=str(i + 1)) client.create_thing(thingName=str(i + 1))
things = client.list_things() things = client.list_things()
things.should.have.key("nextToken") assert len(things["things"]) == 50
things.should.have.key("things").which.should.have.length_of(50) assert things["things"][0]["thingName"] == "1"
things["things"][0]["thingName"].should.equal("1") assert (
things["things"][0]["thingArn"].should.equal( things["things"][0]["thingArn"]
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/1" == f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/1"
) )
things["things"][-1]["thingName"].should.equal("50") assert things["things"][-1]["thingName"] == "50"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/50" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/50"
) )
things = client.list_things(nextToken=things["nextToken"]) things = client.list_things(nextToken=things["nextToken"])
things.should.have.key("nextToken") assert len(things["things"]) == 50
things.should.have.key("things").which.should.have.length_of(50) assert things["things"][0]["thingName"] == "51"
things["things"][0]["thingName"].should.equal("51") assert (
things["things"][0]["thingArn"].should.equal( things["things"][0]["thingArn"]
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/51" == f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/51"
) )
things["things"][-1]["thingName"].should.equal("100") assert things["things"][-1]["thingName"] == "100"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/100" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/100"
) )
things = client.list_things(nextToken=things["nextToken"]) things = client.list_things(nextToken=things["nextToken"])
things.should.have.key("nextToken") assert len(things["things"]) == 50
things.should.have.key("things").which.should.have.length_of(50) assert things["things"][0]["thingName"] == "101"
things["things"][0]["thingName"].should.equal("101") assert (
things["things"][0]["thingArn"].should.equal( things["things"][0]["thingArn"]
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/101" == f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/101"
) )
things["things"][-1]["thingName"].should.equal("150") assert things["things"][-1]["thingName"] == "150"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/150" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/150"
) )
things = client.list_things(nextToken=things["nextToken"]) things = client.list_things(nextToken=things["nextToken"])
things.should_not.have.key("nextToken") assert "nextToken" not in things
things.should.have.key("things").which.should.have.length_of(50) assert len(things["things"]) == 50
things["things"][0]["thingName"].should.equal("151") assert things["things"][0]["thingName"] == "151"
things["things"][0]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/151" things["things"][0]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/151"
) )
things["things"][-1]["thingName"].should.equal("200") assert things["things"][-1]["thingName"] == "200"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/200" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/200"
) )
@ -167,77 +172,87 @@ def test_list_things_with_attribute_and_thing_type_filter_and_next_token():
# Test filter for thingTypeName # Test filter for thingTypeName
things = client.list_things(thingTypeName=thing_type_name) things = client.list_things(thingTypeName=thing_type_name)
things.should.have.key("nextToken") assert "nextToken" in things
things.should.have.key("things").which.should.have.length_of(50) assert len(things["things"]) == 50
things["things"][0]["thingName"].should.equal("2") assert things["things"][0]["thingName"] == "2"
things["things"][0]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/2" things["things"][0]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/2"
) )
things["things"][-1]["thingName"].should.equal("100") assert things["things"][-1]["thingName"] == "100"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/100" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/100"
) )
all(item["thingTypeName"] == thing_type_name for item in things["things"]) assert all(item["thingTypeName"] == thing_type_name for item in things["things"])
things = client.list_things( things = client.list_things(
nextToken=things["nextToken"], thingTypeName=thing_type_name nextToken=things["nextToken"], thingTypeName=thing_type_name
) )
things.should_not.have.key("nextToken") assert "nextToken" not in things
things.should.have.key("things").which.should.have.length_of(50) assert len(things["things"]) == 50
things["things"][0]["thingName"].should.equal("102") assert things["things"][0]["thingName"] == "102"
things["things"][0]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/102" things["things"][0]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/102"
) )
things["things"][-1]["thingName"].should.equal("200") assert things["things"][-1]["thingName"] == "200"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/200" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/200"
) )
all(item["thingTypeName"] == thing_type_name for item in things["things"]) assert all(item["thingTypeName"] == thing_type_name for item in things["things"])
# Test filter for attributes # Test filter for attributes
things = client.list_things(attributeName="foo", attributeValue="bar") things = client.list_things(attributeName="foo", attributeValue="bar")
things.should.have.key("nextToken") assert "nextToken" in things
things.should.have.key("things").which.should.have.length_of(50) assert len(things["things"]) == 50
things["things"][0]["thingName"].should.equal("3") assert things["things"][0]["thingName"] == "3"
things["things"][0]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/3" things["things"][0]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/3"
) )
things["things"][-1]["thingName"].should.equal("150") assert things["things"][-1]["thingName"] == "150"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/150" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/150"
) )
all(item["attributes"] == {"foo": "bar"} for item in things["things"]) assert all(item["attributes"] == {"foo": "bar"} for item in things["things"])
things = client.list_things( things = client.list_things(
nextToken=things["nextToken"], attributeName="foo", attributeValue="bar" nextToken=things["nextToken"], attributeName="foo", attributeValue="bar"
) )
things.should_not.have.key("nextToken") assert "nextToken" not in things
things.should.have.key("things").which.should.have.length_of(16) assert len(things["things"]) == 16
things["things"][0]["thingName"].should.equal("153") assert things["things"][0]["thingName"] == "153"
things["things"][0]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/153" things["things"][0]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/153"
) )
things["things"][-1]["thingName"].should.equal("198") assert things["things"][-1]["thingName"] == "198"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/198" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/198"
) )
all(item["attributes"] == {"foo": "bar"} for item in things["things"]) assert all(item["attributes"] == {"foo": "bar"} for item in things["things"])
# Test filter for attributes and thingTypeName # Test filter for attributes and thingTypeName
things = client.list_things( things = client.list_things(
thingTypeName=thing_type_name, attributeName="foo", attributeValue="bar" thingTypeName=thing_type_name, attributeName="foo", attributeValue="bar"
) )
things.should_not.have.key("nextToken") assert "nextToken" not in things
things.should.have.key("things").which.should.have.length_of(33) assert len(things["things"]) == 33
things["things"][0]["thingName"].should.equal("6") assert things["things"][0]["thingName"] == "6"
things["things"][0]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/6" things["things"][0]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/6"
) )
things["things"][-1]["thingName"].should.equal("198") assert things["things"][-1]["thingName"] == "198"
things["things"][-1]["thingArn"].should.equal( assert (
f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/198" things["things"][-1]["thingArn"]
== f"arn:aws:iot:ap-northeast-1:{ACCOUNT_ID}:thing/198"
) )
all( assert all(
item["attributes"] == {"foo": "bar"} item["attributes"] == {"foo": "bar"}
and item["thingTypeName"] == thing_type_name and item["thingTypeName"] == thing_type_name
for item in things["things"] for item in things["things"]

View File

@ -29,7 +29,7 @@ def test_topic_rule_create():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.create_topic_rule(ruleName=name, topicRulePayload=payload) client.create_topic_rule(ruleName=name, topicRulePayload=payload)
error_code = ex.value.response["Error"]["Code"] error_code = ex.value.response["Error"]["Code"]
error_code.should.equal("ResourceAlreadyExistsException") assert error_code == "ResourceAlreadyExistsException"
@mock_iot @mock_iot
@ -38,19 +38,19 @@ def test_topic_rule_list():
# empty response # empty response
res = client.list_topic_rules() res = client.list_topic_rules()
res.should.have.key("rules").which.should.have.length_of(0) assert len(res["rules"]) == 0
client.create_topic_rule(ruleName=name, topicRulePayload=payload) client.create_topic_rule(ruleName=name, topicRulePayload=payload)
client.create_topic_rule(ruleName="my-rule-2", topicRulePayload=payload) client.create_topic_rule(ruleName="my-rule-2", topicRulePayload=payload)
res = client.list_topic_rules() res = client.list_topic_rules()
res.should.have.key("rules").which.should.have.length_of(2) assert len(res["rules"]) == 2
for rule, rule_name in zip(res["rules"], [name, "my-rule-2"]): for rule, rule_name in zip(res["rules"], [name, "my-rule-2"]):
rule.should.have.key("ruleName").which.should.equal(rule_name) assert rule["ruleName"] == rule_name
rule.should.have.key("createdAt").which.should_not.be.none assert rule["createdAt"] is not None
rule.should.have.key("ruleArn").which.should_not.be.none assert rule["ruleArn"] is not None
rule.should.have.key("ruleDisabled").which.should.equal(payload["ruleDisabled"]) assert rule["ruleDisabled"] == payload["ruleDisabled"]
rule.should.have.key("topicPattern").which.should.equal("topic/*") assert rule["topicPattern"] == "topic/*"
@mock_iot @mock_iot
@ -61,25 +61,22 @@ def test_topic_rule_get():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.get_topic_rule(ruleName=name) client.get_topic_rule(ruleName=name)
error_code = ex.value.response["Error"]["Code"] error_code = ex.value.response["Error"]["Code"]
error_code.should.equal("ResourceNotFoundException") assert error_code == "ResourceNotFoundException"
client.create_topic_rule(ruleName=name, topicRulePayload=payload) client.create_topic_rule(ruleName=name, topicRulePayload=payload)
rule = client.get_topic_rule(ruleName=name) rule = client.get_topic_rule(ruleName=name)
rule.should.have.key("ruleArn").which.should_not.be.none assert rule["ruleArn"] is not None
rule.should.have.key("rule")
rrule = rule["rule"] rrule = rule["rule"]
rrule.should.have.key("actions").which.should.equal(payload["actions"]) assert rrule["actions"] == payload["actions"]
rrule.should.have.key("awsIotSqlVersion").which.should.equal( assert rrule["awsIotSqlVersion"] == payload["awsIotSqlVersion"]
payload["awsIotSqlVersion"] assert rrule["createdAt"] is not None
) assert rrule["description"] == payload["description"]
rrule.should.have.key("createdAt").which.should_not.be.none assert rrule["errorAction"] == payload["errorAction"]
rrule.should.have.key("description").which.should.equal(payload["description"]) assert rrule["ruleDisabled"] == payload["ruleDisabled"]
rrule.should.have.key("errorAction").which.should.equal(payload["errorAction"]) assert rrule["ruleName"] == name
rrule.should.have.key("ruleDisabled").which.should.equal(payload["ruleDisabled"]) assert rrule["sql"] == payload["sql"]
rrule.should.have.key("ruleName").which.should.equal(name)
rrule.should.have.key("sql").which.should.equal(payload["sql"])
@mock_iot @mock_iot
@ -90,7 +87,7 @@ def test_topic_rule_replace():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.replace_topic_rule(ruleName=name, topicRulePayload=payload) client.replace_topic_rule(ruleName=name, topicRulePayload=payload)
error_code = ex.value.response["Error"]["Code"] error_code = ex.value.response["Error"]["Code"]
error_code.should.equal("ResourceNotFoundException") assert error_code == "ResourceNotFoundException"
client.create_topic_rule(ruleName=name, topicRulePayload=payload) client.create_topic_rule(ruleName=name, topicRulePayload=payload)
@ -99,8 +96,8 @@ def test_topic_rule_replace():
client.replace_topic_rule(ruleName=name, topicRulePayload=my_payload) client.replace_topic_rule(ruleName=name, topicRulePayload=my_payload)
rule = client.get_topic_rule(ruleName=name) rule = client.get_topic_rule(ruleName=name)
rule["rule"]["ruleName"].should.equal(name) assert rule["rule"]["ruleName"] == name
rule["rule"]["description"].should.equal(my_payload["description"]) assert rule["rule"]["description"] == my_payload["description"]
@mock_iot @mock_iot
@ -111,15 +108,15 @@ def test_topic_rule_disable():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.disable_topic_rule(ruleName=name) client.disable_topic_rule(ruleName=name)
error_code = ex.value.response["Error"]["Code"] error_code = ex.value.response["Error"]["Code"]
error_code.should.equal("ResourceNotFoundException") assert error_code == "ResourceNotFoundException"
client.create_topic_rule(ruleName=name, topicRulePayload=payload) client.create_topic_rule(ruleName=name, topicRulePayload=payload)
client.disable_topic_rule(ruleName=name) client.disable_topic_rule(ruleName=name)
rule = client.get_topic_rule(ruleName=name) rule = client.get_topic_rule(ruleName=name)
rule["rule"]["ruleName"].should.equal(name) assert rule["rule"]["ruleName"] == name
rule["rule"]["ruleDisabled"].should.equal(True) assert rule["rule"]["ruleDisabled"] is True
@mock_iot @mock_iot
@ -130,7 +127,7 @@ def test_topic_rule_enable():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.enable_topic_rule(ruleName=name) client.enable_topic_rule(ruleName=name)
error_code = ex.value.response["Error"]["Code"] error_code = ex.value.response["Error"]["Code"]
error_code.should.equal("ResourceNotFoundException") assert error_code == "ResourceNotFoundException"
my_payload = payload.copy() my_payload = payload.copy()
my_payload["ruleDisabled"] = True my_payload["ruleDisabled"] = True
@ -139,8 +136,8 @@ def test_topic_rule_enable():
client.enable_topic_rule(ruleName=name) client.enable_topic_rule(ruleName=name)
rule = client.get_topic_rule(ruleName=name) rule = client.get_topic_rule(ruleName=name)
rule["rule"]["ruleName"].should.equal(name) assert rule["rule"]["ruleName"] == name
rule["rule"]["ruleDisabled"].should.equal(False) assert rule["rule"]["ruleDisabled"] is False
@mock_iot @mock_iot
@ -151,7 +148,7 @@ def test_topic_rule_delete():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.delete_topic_rule(ruleName=name) client.delete_topic_rule(ruleName=name)
error_code = ex.value.response["Error"]["Code"] error_code = ex.value.response["Error"]["Code"]
error_code.should.equal("ResourceNotFoundException") assert error_code == "ResourceNotFoundException"
client.create_topic_rule(ruleName=name, topicRulePayload=payload) client.create_topic_rule(ruleName=name, topicRulePayload=payload)
@ -160,4 +157,4 @@ def test_topic_rule_delete():
client.delete_topic_rule(ruleName=name) client.delete_topic_rule(ruleName=name)
res = client.list_topic_rules() res = client.list_topic_rules()
res.should.have.key("rules").which.should.have.length_of(0) assert len(res["rules"]) == 0

View File

@ -1,8 +1,7 @@
import json import json
from urllib.parse import quote
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from urllib.parse import quote
import moto.server as server import moto.server as server
from moto import mock_iot from moto import mock_iot
@ -19,7 +18,7 @@ def test_iot_list():
# just making sure that server is up # just making sure that server is up
res = test_client.get("/things") res = test_client.get("/things")
res.status_code.should.equal(200) assert res.status_code == 200
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -45,6 +44,6 @@ def test_list_attached_policies(url_encode_arn):
certificate_arn = quote(certificate_arn, safe="") certificate_arn = quote(certificate_arn, safe="")
result = test_client.post(f"/attached-policies/{certificate_arn}") result = test_client.post(f"/attached-policies/{certificate_arn}")
result.status_code.should.equal(200) assert result.status_code == 200
result_dict = json.loads(result.data.decode("utf-8")) result_dict = json.loads(result.data.decode("utf-8"))
result_dict["policies"][0]["policyName"].should.equal("my-policy") assert result_dict["policies"][0]["policyName"] == "my-policy"