diff --git a/moto/iot/exceptions.py b/moto/iot/exceptions.py index e3acf9690..46e6b93ee 100644 --- a/moto/iot/exceptions.py +++ b/moto/iot/exceptions.py @@ -60,3 +60,12 @@ class ResourceAlreadyExistsException(IoTClientError): super(ResourceAlreadyExistsException, self).__init__( "ResourceAlreadyExistsException", msg or "The resource already exists." ) + + +class VersionsLimitExceededException(IoTClientError): + def __init__(self, name): + self.code = 409 + super(VersionsLimitExceededException, self).__init__( + "VersionsLimitExceededException", + "The policy %s already has the maximum number of versions (5)" % name, + ) diff --git a/moto/iot/models.py b/moto/iot/models.py index 2af6eec9b..c1349224f 100644 --- a/moto/iot/models.py +++ b/moto/iot/models.py @@ -21,6 +21,7 @@ from .exceptions import ( InvalidStateTransitionException, VersionConflictException, ResourceAlreadyExistsException, + VersionsLimitExceededException, ) @@ -815,6 +816,8 @@ class IoTBackend(BaseBackend): policy = self.get_policy(policy_name) if not policy: raise ResourceNotFoundException() + if len(policy.versions) >= 5: + raise VersionsLimitExceededException(policy_name) version = FakePolicyVersion( policy_name, policy_document, set_as_default, self.region_name ) diff --git a/tests/test_iot/test_iot.py b/tests/test_iot/test_iot.py index 9bd0fbd52..1aa955313 100644 --- a/tests/test_iot/test_iot.py +++ b/tests/test_iot/test_iot.py @@ -139,8 +139,32 @@ def test_policy_versions(): policy1["policyVersionId"] ) + policy3 = client.create_policy_version( + policyName=policy_name, + policyDocument=json.dumps({"version": "version_3"}), + setAsDefault=False, + ) + policy3.should.have.key("policyArn").which.should_not.be.none + policy3.should.have.key("policyDocument").which.should.equal( + json.dumps({"version": "version_3"}) + ) + policy3.should.have.key("policyVersionId").which.should.equal("4") + policy3.should.have.key("isDefaultVersion").which.should.equal(False) + + policy4 = client.create_policy_version( + policyName=policy_name, + policyDocument=json.dumps({"version": "version_4"}), + setAsDefault=False, + ) + policy4.should.have.key("policyArn").which.should_not.be.none + policy4.should.have.key("policyDocument").which.should.equal( + json.dumps({"version": "version_4"}) + ) + policy4.should.have.key("policyVersionId").which.should.equal("5") + policy4.should.have.key("isDefaultVersion").which.should.equal(False) + policy_versions = client.list_policy_versions(policyName=policy_name) - policy_versions.should.have.key("policyVersions").which.should.have.length_of(3) + policy_versions.should.have.key("policyVersions").which.should.have.length_of(5) list( map(lambda item: item["isDefaultVersion"], policy_versions["policyVersions"]) ).count(True).should.equal(1) @@ -162,10 +186,10 @@ def test_policy_versions(): ) client.set_default_policy_version( - policyName=policy_name, policyVersionId=policy2["policyVersionId"] + policyName=policy_name, policyVersionId=policy4["policyVersionId"] ) policy_versions = client.list_policy_versions(policyName=policy_name) - policy_versions.should.have.key("policyVersions").which.should.have.length_of(3) + policy_versions.should.have.key("policyVersions").which.should.have.length_of(5) list( map(lambda item: item["isDefaultVersion"], policy_versions["policyVersions"]) ).count(True).should.equal(1) @@ -173,25 +197,48 @@ def test_policy_versions(): filter(lambda item: item["isDefaultVersion"], policy_versions["policyVersions"]) ) default_policy[0].should.have.key("versionId").should.equal( - policy2["policyVersionId"] + policy4["policyVersionId"] ) policy = client.get_policy(policyName=policy_name) policy.should.have.key("policyName").which.should.equal(policy_name) policy.should.have.key("policyArn").which.should_not.be.none policy.should.have.key("policyDocument").which.should.equal( - json.dumps({"version": "version_2"}) + json.dumps({"version": "version_4"}) ) policy.should.have.key("defaultVersionId").which.should.equal( - policy2["policyVersionId"] + policy4["policyVersionId"] ) + try: + client.create_policy_version( + policyName=policy_name, + policyDocument=json.dumps({"version": "version_5"}), + setAsDefault=False, + ) + assert False, "Should have failed in previous call" + except Exception as exception: + exception.response["Error"]["Message"].should.equal( + "The policy %s already has the maximum number of versions (5)" % policy_name + ) + client.delete_policy_version(policyName=policy_name, policyVersionId="1") policy_versions = client.list_policy_versions(policyName=policy_name) + policy_versions.should.have.key("policyVersions").which.should.have.length_of(4) + + client.delete_policy_version( + policyName=policy_name, policyVersionId=policy1["policyVersionId"] + ) + policy_versions = client.list_policy_versions(policyName=policy_name) + policy_versions.should.have.key("policyVersions").which.should.have.length_of(3) + client.delete_policy_version( + policyName=policy_name, policyVersionId=policy2["policyVersionId"] + ) + policy_versions = client.list_policy_versions(policyName=policy_name) policy_versions.should.have.key("policyVersions").which.should.have.length_of(2) client.delete_policy_version( - policyName=policy_name, policyVersionId=policy1["policyVersionId"] + policyName=policy_name, policyVersionId=policy3["policyVersionId"] ) policy_versions = client.list_policy_versions(policyName=policy_name) policy_versions.should.have.key("policyVersions").which.should.have.length_of(1) @@ -199,7 +246,7 @@ def test_policy_versions(): # should fail as it"s the default policy. Should use delete_policy instead try: client.delete_policy_version( - policyName=policy_name, policyVersionId=policy2["policyVersionId"] + policyName=policy_name, policyVersionId=policy4["policyVersionId"] ) assert False, "Should have failed in previous call" except Exception as exception: