Merge pull request #1901 from cm-iwata/add_iot_attach_policy

Add support for IoT attach_policy
This commit is contained in:
Steve Pulec 2018-12-28 20:17:31 -05:00 committed by GitHub
commit 19bdf7de73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 3 deletions

View File

@ -2376,11 +2376,11 @@
- [ ] unsubscribe_from_event
- [ ] update_assessment_target
## iot - 30% implemented
## iot - 32% implemented
- [ ] accept_certificate_transfer
- [X] add_thing_to_thing_group
- [ ] associate_targets_with_job
- [ ] attach_policy
- [X] attach_policy
- [X] attach_principal_policy
- [X] attach_thing_principal
- [ ] cancel_certificate_transfer
@ -2429,7 +2429,7 @@
- [X] describe_thing_group
- [ ] describe_thing_registration_task
- [X] describe_thing_type
- [ ] detach_policy
- [X] detach_policy
- [X] detach_principal_policy
- [X] detach_thing_principal
- [ ] disable_topic_rule

View File

@ -457,6 +457,14 @@ class IoTBackend(BaseBackend):
pass
raise ResourceNotFoundException()
def attach_policy(self, policy_name, target):
principal = self._get_principal(target)
policy = self.get_policy(policy_name)
k = (target, policy_name)
if k in self.principal_policies:
return
self.principal_policies[k] = (principal, policy)
def attach_principal_policy(self, policy_name, principal_arn):
principal = self._get_principal(principal_arn)
policy = self.get_policy(policy_name)
@ -465,6 +473,15 @@ class IoTBackend(BaseBackend):
return
self.principal_policies[k] = (principal, policy)
def detach_policy(self, policy_name, target):
# this may raises ResourceNotFoundException
self._get_principal(target)
self.get_policy(policy_name)
k = (target, policy_name)
if k not in self.principal_policies:
raise ResourceNotFoundException()
del self.principal_policies[k]
def detach_principal_policy(self, policy_name, principal_arn):
# this may raises ResourceNotFoundException
self._get_principal(principal_arn)

View File

@ -224,6 +224,15 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def attach_policy(self):
policy_name = self._get_param("policyName")
target = self._get_param('target')
self.iot_backend.attach_policy(
policy_name=policy_name,
target=target,
)
return json.dumps(dict())
def attach_principal_policy(self):
policy_name = self._get_param("policyName")
principal = self.headers.get('x-amzn-iot-principal')
@ -233,6 +242,15 @@ class IoTResponse(BaseResponse):
)
return json.dumps(dict())
def detach_policy(self):
policy_name = self._get_param("policyName")
target = self._get_param('target')
self.iot_backend.detach_policy(
policy_name=policy_name,
target=target,
)
return json.dumps(dict())
def detach_principal_policy(self):
policy_name = self._get_param("policyName")
principal = self.headers.get('x-amzn-iot-principal')

View File

@ -401,6 +401,47 @@ def test_policy():
@mock_iot
def test_principal_policy():
client = boto3.client('iot', region_name='ap-northeast-1')
policy_name = 'my-policy'
doc = '{}'
client.create_policy(policyName=policy_name, policyDocument=doc)
cert = client.create_keys_and_certificate(setAsActive=True)
cert_arn = cert['certificateArn']
client.attach_policy(policyName=policy_name, target=cert_arn)
res = client.list_principal_policies(principal=cert_arn)
res.should.have.key('policies').which.should.have.length_of(1)
for policy in res['policies']:
policy.should.have.key('policyName').which.should_not.be.none
policy.should.have.key('policyArn').which.should_not.be.none
# do nothing if policy have already attached to certificate
client.attach_policy(policyName=policy_name, target=cert_arn)
res = client.list_principal_policies(principal=cert_arn)
res.should.have.key('policies').which.should.have.length_of(1)
for policy in res['policies']:
policy.should.have.key('policyName').which.should_not.be.none
policy.should.have.key('policyArn').which.should_not.be.none
res = client.list_policy_principals(policyName=policy_name)
res.should.have.key('principals').which.should.have.length_of(1)
for principal in res['principals']:
principal.should_not.be.none
client.detach_policy(policyName=policy_name, target=cert_arn)
res = client.list_principal_policies(principal=cert_arn)
res.should.have.key('policies').which.should.have.length_of(0)
res = client.list_policy_principals(policyName=policy_name)
res.should.have.key('principals').which.should.have.length_of(0)
with assert_raises(ClientError) as e:
client.detach_policy(policyName=policy_name, target=cert_arn)
e.exception.response['Error']['Code'].should.equal('ResourceNotFoundException')
@mock_iot
def test_principal_policy_deprecated():
client = boto3.client('iot', region_name='ap-northeast-1')
policy_name = 'my-policy'
doc = '{}'