IOT - add list_targets_for_policy action to IoTBackend (#5309)

This commit is contained in:
Jonas 2022-08-01 11:14:47 +02:00 committed by GitHub
parent f701875334
commit 201b61455b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 2 deletions

View File

@ -3483,7 +3483,7 @@
- [ ] list_security_profiles_for_target
- [ ] list_streams
- [ ] list_tags_for_resource
- [ ] list_targets_for_policy
- [x] list_targets_for_policy
- [ ] list_targets_for_security_profile
- [X] list_thing_groups
- [X] list_thing_groups_for_thing

View File

@ -1163,11 +1163,20 @@ class IoTBackend(BaseBackend):
return policies
def list_policy_principals(self, policy_name):
# this action is deprecated
# https://docs.aws.amazon.com/iot/latest/apireference/API_ListTargetsForPolicy.html
# should use ListTargetsForPolicy instead
principals = [
k[0] for k, v in self.principal_policies.items() if k[1] == policy_name
]
return principals
def list_targets_for_policy(self, policy_name):
# This behaviour is different to list_policy_principals which will just return an empty list
if policy_name not in self.policies:
raise ResourceNotFoundException("Policy not found")
return self.list_policy_principals(policy_name=policy_name)
def attach_thing_principal(self, thing_name, principal_arn):
principal = self._get_principal(principal_arn)
thing = self.describe_thing(thing_name)

View File

@ -538,6 +538,12 @@ class IoTResponse(BaseResponse):
next_marker = None
return json.dumps(dict(principals=principals, nextMarker=next_marker))
def list_targets_for_policy(self):
"""https://docs.aws.amazon.com/iot/latest/apireference/API_ListTargetsForPolicy.html"""
policy_name = self._get_param("policyName")
principals = self.iot_backend.list_targets_for_policy(policy_name=policy_name)
return json.dumps(dict(targets=principals, nextMarker=None))
def attach_thing_principal(self):
thing_name = self._get_param("thingName")
principal = self.headers.get("x-amzn-principal")

View File

@ -377,3 +377,40 @@ def test_policy_delete_fails_when_versions_exist(iot_client, policy):
e.value.response["Error"]["Message"].should.contain(
"Cannot delete the policy because it has one or more policy versions attached to it"
)
def test_list_targets_for_policy_empty(iot_client, policy):
res = iot_client.list_targets_for_policy(policyName=policy["policyName"])
res.should.have.key("targets").which.should.have.length_of(0)
def test_list_targets_for_policy_one_attached_thing_group(iot_client, policy):
thing_group = iot_client.create_thing_group(thingGroupName="my-thing-group")
thing_group_arn = thing_group["thingGroupArn"]
policy_name = policy["policyName"]
iot_client.attach_policy(policyName=policy_name, target=thing_group_arn)
res = iot_client.list_targets_for_policy(policyName=policy["policyName"])
res.should.have.key("targets").which.should.have.length_of(1)
res["targets"][0].should.equal(thing_group_arn)
def test_list_targets_for_policy_one_attached_certificate(iot_client, policy):
cert = iot_client.create_keys_and_certificate(setAsActive=True)
cert_arn = cert["certificateArn"]
policy_name = policy["policyName"]
iot_client.attach_policy(policyName=policy_name, target=cert_arn)
res = iot_client.list_targets_for_policy(policyName=policy["policyName"])
res.should.have.key("targets").which.should.have.length_of(1)
res["targets"][0].should.equal(cert_arn)
def test_list_targets_for_policy_resource_not_found(iot_client):
with pytest.raises(ClientError) as e:
iot_client.list_targets_for_policy(policyName="NON_EXISTENT_POLICY_NAME")
e.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
e.value.response["Error"]["Message"].should.contain("Policy not found")