[Resolves #2196] - endpoints for querying organizations SC policies (#2197)

adding support for organizations service control policies

* [Resolves #2196] - endpoints for querying organizations SC policies

I have added the following mock endpoints to the Organizations service:

- create_policy
- list_policies
- describe_policy
- attach_policy
- list_policies_for_target
- list_targets_for_policy
This commit is contained in:
Ashley Gould 2019-05-25 02:20:19 -07:00 committed by Terry Cain
parent a61124f774
commit a3f6d2c110
6 changed files with 508 additions and 26 deletions

View File

@ -3098,14 +3098,14 @@
- [ ] update_server
- [ ] update_server_engine_attributes
## organizations - 30% implemented
## organizations - 47% implemented
- [ ] accept_handshake
- [ ] attach_policy
- [X] attach_policy
- [ ] cancel_handshake
- [X] create_account
- [X] create_organization
- [X] create_organizational_unit
- [ ] create_policy
- [X] create_policy
- [ ] decline_handshake
- [ ] delete_organization
- [ ] delete_organizational_unit
@ -3115,7 +3115,7 @@
- [ ] describe_handshake
- [X] describe_organization
- [X] describe_organizational_unit
- [ ] describe_policy
- [X] describe_policy
- [ ] detach_policy
- [ ] disable_aws_service_access
- [ ] disable_policy_type
@ -3133,10 +3133,10 @@
- [ ] list_handshakes_for_organization
- [X] list_organizational_units_for_parent
- [X] list_parents
- [ ] list_policies
- [ ] list_policies_for_target
- [X] list_policies
- [X] list_policies_for_target
- [X] list_roots
- [ ] list_targets_for_policy
- [X] list_targets_for_policy
- [X] move_account
- [ ] remove_account_from_organization
- [ ] update_organizational_unit

View File

@ -47,6 +47,7 @@ class FakeOrganization(BaseModel):
class FakeAccount(BaseModel):
def __init__(self, organization, **kwargs):
self.type = 'ACCOUNT'
self.organization_id = organization.id
self.master_account_id = organization.master_account_id
self.create_account_status_id = utils.make_random_create_account_status_id()
@ -57,6 +58,7 @@ class FakeAccount(BaseModel):
self.status = 'ACTIVE'
self.joined_method = 'CREATED'
self.parent_id = organization.root_id
self.attached_policies = []
@property
def arn(self):
@ -103,6 +105,7 @@ class FakeOrganizationalUnit(BaseModel):
self.name = kwargs.get('Name')
self.parent_id = kwargs.get('ParentId')
self._arn_format = utils.OU_ARN_FORMAT
self.attached_policies = []
@property
def arn(self):
@ -134,6 +137,7 @@ class FakeRoot(FakeOrganizationalUnit):
'Status': 'ENABLED'
}]
self._arn_format = utils.ROOT_ARN_FORMAT
self.attached_policies = []
def describe(self):
return {
@ -144,12 +148,52 @@ class FakeRoot(FakeOrganizationalUnit):
}
class FakeServiceControlPolicy(BaseModel):
def __init__(self, organization, **kwargs):
self.type = 'POLICY'
self.content = kwargs.get('Content')
self.description = kwargs.get('Description')
self.name = kwargs.get('Name')
self.type = kwargs.get('Type')
self.id = utils.make_random_service_control_policy_id()
self.aws_managed = False
self.organization_id = organization.id
self.master_account_id = organization.master_account_id
self._arn_format = utils.SCP_ARN_FORMAT
self.attachments = []
@property
def arn(self):
return self._arn_format.format(
self.master_account_id,
self.organization_id,
self.id
)
def describe(self):
return {
'Policy': {
'PolicySummary': {
'Id': self.id,
'Arn': self.arn,
'Name': self.name,
'Description': self.description,
'Type': self.type,
'AwsManaged': self.aws_managed,
},
'Content': self.content
}
}
class OrganizationsBackend(BaseBackend):
def __init__(self):
self.org = None
self.accounts = []
self.ou = []
self.policies = []
def create_organization(self, **kwargs):
self.org = FakeOrganization(kwargs['FeatureSet'])
@ -292,5 +336,108 @@ class OrganizationsBackend(BaseBackend):
]
)
def create_policy(self, **kwargs):
new_policy = FakeServiceControlPolicy(self.org, **kwargs)
self.policies.append(new_policy)
return new_policy.describe()
def describe_policy(self, **kwargs):
if re.compile(utils.SCP_ID_REGEX).match(kwargs['PolicyId']):
policy = next((p for p in self.policies if p.id == kwargs['PolicyId']), None)
if policy is None:
raise RESTError(
'PolicyNotFoundException',
"You specified a policy that doesn't exist."
)
else:
raise RESTError(
'InvalidInputException',
'You specified an invalid value.'
)
return policy.describe()
def attach_policy(self, **kwargs):
policy = next((p for p in self.policies if p.id == kwargs['PolicyId']), None)
if (re.compile(utils.ROOT_ID_REGEX).match(kwargs['TargetId']) or
re.compile(utils.OU_ID_REGEX).match(kwargs['TargetId'])):
ou = next((ou for ou in self.ou if ou.id == kwargs['TargetId']), None)
if ou is not None:
if ou not in ou.attached_policies:
ou.attached_policies.append(policy)
policy.attachments.append(ou)
else:
raise RESTError(
'OrganizationalUnitNotFoundException',
"You specified an organizational unit that doesn't exist."
)
elif re.compile(utils.ACCOUNT_ID_REGEX).match(kwargs['TargetId']):
account = next((a for a in self.accounts if a.id == kwargs['TargetId']), None)
if account is not None:
if account not in account.attached_policies:
account.attached_policies.append(policy)
policy.attachments.append(account)
else:
raise RESTError(
'AccountNotFoundException',
"You specified an account that doesn't exist."
)
else:
raise RESTError(
'InvalidInputException',
'You specified an invalid value.'
)
def list_policies(self, **kwargs):
return dict(Policies=[
p.describe()['Policy']['PolicySummary'] for p in self.policies
])
def list_policies_for_target(self, **kwargs):
if re.compile(utils.OU_ID_REGEX).match(kwargs['TargetId']):
obj = next((ou for ou in self.ou if ou.id == kwargs['TargetId']), None)
if obj is None:
raise RESTError(
'OrganizationalUnitNotFoundException',
"You specified an organizational unit that doesn't exist."
)
elif re.compile(utils.ACCOUNT_ID_REGEX).match(kwargs['TargetId']):
obj = next((a for a in self.accounts if a.id == kwargs['TargetId']), None)
if obj is None:
raise RESTError(
'AccountNotFoundException',
"You specified an account that doesn't exist."
)
else:
raise RESTError(
'InvalidInputException',
'You specified an invalid value.'
)
return dict(Policies=[
p.describe()['Policy']['PolicySummary'] for p in obj.attached_policies
])
def list_targets_for_policy(self, **kwargs):
if re.compile(utils.SCP_ID_REGEX).match(kwargs['PolicyId']):
policy = next((p for p in self.policies if p.id == kwargs['PolicyId']), None)
if policy is None:
raise RESTError(
'PolicyNotFoundException',
"You specified a policy that doesn't exist."
)
else:
raise RESTError(
'InvalidInputException',
'You specified an invalid value.'
)
objects = [
{
'TargetId': obj.id,
'Arn': obj.arn,
'Name': obj.name,
'Type': obj.type,
} for obj in policy.attachments
]
return dict(Targets=objects)
organizations_backend = OrganizationsBackend()

View File

@ -85,3 +85,33 @@ class OrganizationsResponse(BaseResponse):
return json.dumps(
self.organizations_backend.list_children(**self.request_params)
)
def create_policy(self):
return json.dumps(
self.organizations_backend.create_policy(**self.request_params)
)
def describe_policy(self):
return json.dumps(
self.organizations_backend.describe_policy(**self.request_params)
)
def attach_policy(self):
return json.dumps(
self.organizations_backend.attach_policy(**self.request_params)
)
def list_policies(self):
return json.dumps(
self.organizations_backend.list_policies(**self.request_params)
)
def list_policies_for_target(self):
return json.dumps(
self.organizations_backend.list_policies_for_target(**self.request_params)
)
def list_targets_for_policy(self):
return json.dumps(
self.organizations_backend.list_targets_for_policy(**self.request_params)
)

View File

@ -10,6 +10,7 @@ MASTER_ACCOUNT_ARN_FORMAT = 'arn:aws:organizations::{0}:account/{1}/{0}'
ACCOUNT_ARN_FORMAT = 'arn:aws:organizations::{0}:account/{1}/{2}'
ROOT_ARN_FORMAT = 'arn:aws:organizations::{0}:root/{1}/{2}'
OU_ARN_FORMAT = 'arn:aws:organizations::{0}:ou/{1}/{2}'
SCP_ARN_FORMAT = 'arn:aws:organizations::{0}:policy/{1}/service_control_policy/{2}'
CHARSET = string.ascii_lowercase + string.digits
ORG_ID_SIZE = 10
@ -17,6 +18,15 @@ ROOT_ID_SIZE = 4
ACCOUNT_ID_SIZE = 12
OU_ID_SUFFIX_SIZE = 8
CREATE_ACCOUNT_STATUS_ID_SIZE = 8
SCP_ID_SIZE = 8
EMAIL_REGEX = "^.+@[a-zA-Z0-9-.]+.[a-zA-Z]{2,3}|[0-9]{1,3}$"
ORG_ID_REGEX = r'o-[a-z0-9]{%s}' % ORG_ID_SIZE
ROOT_ID_REGEX = r'r-[a-z0-9]{%s}' % ROOT_ID_SIZE
OU_ID_REGEX = r'ou-[a-z0-9]{%s}-[a-z0-9]{%s}' % (ROOT_ID_SIZE, OU_ID_SUFFIX_SIZE)
ACCOUNT_ID_REGEX = r'[0-9]{%s}' % ACCOUNT_ID_SIZE
CREATE_ACCOUNT_STATUS_ID_REGEX = r'car-[a-z0-9]{%s}' % CREATE_ACCOUNT_STATUS_ID_SIZE
SCP_ID_REGEX = r'p-[a-z0-9]{%s}' % SCP_ID_SIZE
def make_random_org_id():
@ -57,3 +67,10 @@ def make_random_create_account_status_id():
# "car-" followed by from 8 to 32 lower-case letters or digits.
# e.g. 'car-35gxzwrp'
return 'car-' + ''.join(random.choice(CHARSET) for x in range(CREATE_ACCOUNT_STATUS_ID_SIZE))
def make_random_service_control_policy_id():
# The regex pattern for a policy ID string requires "p-" followed by
# from 8 to 128 lower-case letters or digits.
# e.g. 'p-k2av4a8a'
return 'p-' + ''.join(random.choice(CHARSET) for x in range(SCP_ID_SIZE))

View File

@ -5,38 +5,36 @@ import sure # noqa
import datetime
from moto.organizations import utils
EMAIL_REGEX = "^.+@[a-zA-Z0-9-.]+.[a-zA-Z]{2,3}|[0-9]{1,3}$"
ORG_ID_REGEX = r'o-[a-z0-9]{%s}' % utils.ORG_ID_SIZE
ROOT_ID_REGEX = r'r-[a-z0-9]{%s}' % utils.ROOT_ID_SIZE
OU_ID_REGEX = r'ou-[a-z0-9]{%s}-[a-z0-9]{%s}' % (utils.ROOT_ID_SIZE, utils.OU_ID_SUFFIX_SIZE)
ACCOUNT_ID_REGEX = r'[0-9]{%s}' % utils.ACCOUNT_ID_SIZE
CREATE_ACCOUNT_STATUS_ID_REGEX = r'car-[a-z0-9]{%s}' % utils.CREATE_ACCOUNT_STATUS_ID_SIZE
def test_make_random_org_id():
org_id = utils.make_random_org_id()
org_id.should.match(ORG_ID_REGEX)
org_id.should.match(utils.ORG_ID_REGEX)
def test_make_random_root_id():
root_id = utils.make_random_root_id()
root_id.should.match(ROOT_ID_REGEX)
root_id.should.match(utils.ROOT_ID_REGEX)
def test_make_random_ou_id():
root_id = utils.make_random_root_id()
ou_id = utils.make_random_ou_id(root_id)
ou_id.should.match(OU_ID_REGEX)
ou_id.should.match(utils.OU_ID_REGEX)
def test_make_random_account_id():
account_id = utils.make_random_account_id()
account_id.should.match(ACCOUNT_ID_REGEX)
account_id.should.match(utils.ACCOUNT_ID_REGEX)
def test_make_random_create_account_status_id():
create_account_status_id = utils.make_random_create_account_status_id()
create_account_status_id.should.match(CREATE_ACCOUNT_STATUS_ID_REGEX)
create_account_status_id.should.match(utils.CREATE_ACCOUNT_STATUS_ID_REGEX)
def test_make_random_service_control_policy_id():
service_control_policy_id = utils.make_random_service_control_policy_id()
service_control_policy_id.should.match(utils.SCP_ID_REGEX)
def validate_organization(response):
@ -50,7 +48,7 @@ def validate_organization(response):
'MasterAccountEmail',
'MasterAccountId',
])
org['Id'].should.match(ORG_ID_REGEX)
org['Id'].should.match(utils.ORG_ID_REGEX)
org['MasterAccountId'].should.equal(utils.MASTER_ACCOUNT_ID)
org['MasterAccountArn'].should.equal(utils.MASTER_ACCOUNT_ARN_FORMAT.format(
org['MasterAccountId'],
@ -72,7 +70,7 @@ def validate_roots(org, response):
response.should.have.key('Roots').should.be.a(list)
response['Roots'].should_not.be.empty
root = response['Roots'][0]
root.should.have.key('Id').should.match(ROOT_ID_REGEX)
root.should.have.key('Id').should.match(utils.ROOT_ID_REGEX)
root.should.have.key('Arn').should.equal(utils.ROOT_ARN_FORMAT.format(
org['MasterAccountId'],
org['Id'],
@ -87,7 +85,7 @@ def validate_roots(org, response):
def validate_organizational_unit(org, response):
response.should.have.key('OrganizationalUnit').should.be.a(dict)
ou = response['OrganizationalUnit']
ou.should.have.key('Id').should.match(OU_ID_REGEX)
ou.should.have.key('Id').should.match(utils.OU_ID_REGEX)
ou.should.have.key('Arn').should.equal(utils.OU_ARN_FORMAT.format(
org['MasterAccountId'],
org['Id'],
@ -106,13 +104,13 @@ def validate_account(org, account):
'Name',
'Status',
])
account['Id'].should.match(ACCOUNT_ID_REGEX)
account['Id'].should.match(utils.ACCOUNT_ID_REGEX)
account['Arn'].should.equal(utils.ACCOUNT_ARN_FORMAT.format(
org['MasterAccountId'],
org['Id'],
account['Id'],
))
account['Email'].should.match(EMAIL_REGEX)
account['Email'].should.match(utils.EMAIL_REGEX)
account['JoinedMethod'].should.be.within(['INVITED', 'CREATED'])
account['Status'].should.be.within(['ACTIVE', 'SUSPENDED'])
account['Name'].should.be.a(six.string_types)
@ -128,9 +126,27 @@ def validate_create_account_status(create_status):
'RequestedTimestamp',
'State',
])
create_status['Id'].should.match(CREATE_ACCOUNT_STATUS_ID_REGEX)
create_status['AccountId'].should.match(ACCOUNT_ID_REGEX)
create_status['Id'].should.match(utils.CREATE_ACCOUNT_STATUS_ID_REGEX)
create_status['AccountId'].should.match(utils.ACCOUNT_ID_REGEX)
create_status['AccountName'].should.be.a(six.string_types)
create_status['State'].should.equal('SUCCEEDED')
create_status['RequestedTimestamp'].should.be.a(datetime.datetime)
create_status['CompletedTimestamp'].should.be.a(datetime.datetime)
def validate_policy_summary(org, summary):
summary.should.be.a(dict)
summary.should.have.key('Id').should.match(utils.SCP_ID_REGEX)
summary.should.have.key('Arn').should.equal(utils.SCP_ARN_FORMAT.format(
org['MasterAccountId'],
org['Id'],
summary['Id'],
))
summary.should.have.key('Name').should.be.a(six.string_types)
summary.should.have.key('Description').should.be.a(six.string_types)
summary.should.have.key('Type').should.equal('SERVICE_CONTROL_POLICY')
summary.should.have.key('AwsManaged').should.be.a(bool)
def validate_service_control_policy(org, response):
response.should.have.key('PolicySummary').should.be.a(dict)
response.should.have.key('Content').should.be.a(six.string_types)
validate_policy_summary(org, response['PolicySummary'])

View File

@ -1,6 +1,8 @@
from __future__ import unicode_literals
import boto3
import json
import six
import sure # noqa
from botocore.exceptions import ClientError
from nose.tools import assert_raises
@ -13,6 +15,8 @@ from .organizations_test_utils import (
validate_organizational_unit,
validate_account,
validate_create_account_status,
validate_service_control_policy,
validate_policy_summary,
)
@ -320,3 +324,271 @@ def test_list_children_exception():
ex.operation_name.should.equal('ListChildren')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('InvalidInputException')
# Service Control Policies
policy_doc01 = dict(
Version='2012-10-17',
Statement=[dict(
Sid='MockPolicyStatement',
Effect='Allow',
Action='s3:*',
Resource='*',
)]
)
@mock_organizations
def test_create_policy():
client = boto3.client('organizations', region_name='us-east-1')
org = client.create_organization(FeatureSet='ALL')['Organization']
policy = client.create_policy(
Content=json.dumps(policy_doc01),
Description='A dummy service control policy',
Name='MockServiceControlPolicy',
Type='SERVICE_CONTROL_POLICY'
)['Policy']
validate_service_control_policy(org, policy)
policy['PolicySummary']['Name'].should.equal('MockServiceControlPolicy')
policy['PolicySummary']['Description'].should.equal('A dummy service control policy')
policy['Content'].should.equal(json.dumps(policy_doc01))
@mock_organizations
def test_describe_policy():
client = boto3.client('organizations', region_name='us-east-1')
org = client.create_organization(FeatureSet='ALL')['Organization']
policy_id = client.create_policy(
Content=json.dumps(policy_doc01),
Description='A dummy service control policy',
Name='MockServiceControlPolicy',
Type='SERVICE_CONTROL_POLICY'
)['Policy']['PolicySummary']['Id']
policy = client.describe_policy(PolicyId=policy_id)['Policy']
validate_service_control_policy(org, policy)
policy['PolicySummary']['Name'].should.equal('MockServiceControlPolicy')
policy['PolicySummary']['Description'].should.equal('A dummy service control policy')
policy['Content'].should.equal(json.dumps(policy_doc01))
@mock_organizations
def test_describe_policy_exception():
client = boto3.client('organizations', region_name='us-east-1')
client.create_organization(FeatureSet='ALL')['Organization']
policy_id = 'p-47fhe9s3'
with assert_raises(ClientError) as e:
response = client.describe_policy(PolicyId=policy_id)
ex = e.exception
ex.operation_name.should.equal('DescribePolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('PolicyNotFoundException')
with assert_raises(ClientError) as e:
response = client.describe_policy(PolicyId='meaninglessstring')
ex = e.exception
ex.operation_name.should.equal('DescribePolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('InvalidInputException')
@mock_organizations
def test_attach_policy():
client = boto3.client('organizations', region_name='us-east-1')
org = client.create_organization(FeatureSet='ALL')['Organization']
root_id = client.list_roots()['Roots'][0]['Id']
ou_id = client.create_organizational_unit(
ParentId=root_id,
Name='ou01',
)['OrganizationalUnit']['Id']
account_id = client.create_account(
AccountName=mockname,
Email=mockemail,
)['CreateAccountStatus']['AccountId']
policy_id = client.create_policy(
Content=json.dumps(policy_doc01),
Description='A dummy service control policy',
Name='MockServiceControlPolicy',
Type='SERVICE_CONTROL_POLICY'
)['Policy']['PolicySummary']['Id']
response = client.attach_policy(PolicyId=policy_id, TargetId=root_id)
response['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
response = client.attach_policy(PolicyId=policy_id, TargetId=ou_id)
response['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
response = client.attach_policy(PolicyId=policy_id, TargetId=account_id)
response['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
@mock_organizations
def test_attach_policy_exception():
client = boto3.client('organizations', region_name='us-east-1')
client.create_organization(FeatureSet='ALL')['Organization']
root_id='r-dj873'
ou_id='ou-gi99-i7r8eh2i2'
account_id='126644886543'
policy_id = client.create_policy(
Content=json.dumps(policy_doc01),
Description='A dummy service control policy',
Name='MockServiceControlPolicy',
Type='SERVICE_CONTROL_POLICY'
)['Policy']['PolicySummary']['Id']
with assert_raises(ClientError) as e:
response = client.attach_policy(PolicyId=policy_id, TargetId=root_id)
ex = e.exception
ex.operation_name.should.equal('AttachPolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('OrganizationalUnitNotFoundException')
with assert_raises(ClientError) as e:
response = client.attach_policy(PolicyId=policy_id, TargetId=ou_id)
ex = e.exception
ex.operation_name.should.equal('AttachPolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('OrganizationalUnitNotFoundException')
with assert_raises(ClientError) as e:
response = client.attach_policy(PolicyId=policy_id, TargetId=account_id)
ex = e.exception
ex.operation_name.should.equal('AttachPolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('AccountNotFoundException')
with assert_raises(ClientError) as e:
response = client.attach_policy(PolicyId=policy_id, TargetId='meaninglessstring')
ex = e.exception
ex.operation_name.should.equal('AttachPolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('InvalidInputException')
@mock_organizations
def test_list_polices():
client = boto3.client('organizations', region_name='us-east-1')
org = client.create_organization(FeatureSet='ALL')['Organization']
for i in range(0,4):
client.create_policy(
Content=json.dumps(policy_doc01),
Description='A dummy service control policy',
Name='MockServiceControlPolicy' + str(i),
Type='SERVICE_CONTROL_POLICY'
)
response = client.list_policies(Filter='SERVICE_CONTROL_POLICY')
for policy in response['Policies']:
validate_policy_summary(org, policy)
@mock_organizations
def test_list_policies_for_target():
client = boto3.client('organizations', region_name='us-east-1')
org = client.create_organization(FeatureSet='ALL')['Organization']
root_id = client.list_roots()['Roots'][0]['Id']
ou_id = client.create_organizational_unit(
ParentId=root_id,
Name='ou01',
)['OrganizationalUnit']['Id']
account_id = client.create_account(
AccountName=mockname,
Email=mockemail,
)['CreateAccountStatus']['AccountId']
policy_id = client.create_policy(
Content=json.dumps(policy_doc01),
Description='A dummy service control policy',
Name='MockServiceControlPolicy',
Type='SERVICE_CONTROL_POLICY'
)['Policy']['PolicySummary']['Id']
client.attach_policy(PolicyId=policy_id, TargetId=ou_id)
response = client.list_policies_for_target(
TargetId=ou_id,
Filter='SERVICE_CONTROL_POLICY',
)
for policy in response['Policies']:
validate_policy_summary(org, policy)
client.attach_policy(PolicyId=policy_id, TargetId=account_id)
response = client.list_policies_for_target(
TargetId=account_id,
Filter='SERVICE_CONTROL_POLICY',
)
for policy in response['Policies']:
validate_policy_summary(org, policy)
@mock_organizations
def test_list_policies_for_target_exception():
client = boto3.client('organizations', region_name='us-east-1')
client.create_organization(FeatureSet='ALL')['Organization']
ou_id='ou-gi99-i7r8eh2i2'
account_id='126644886543'
with assert_raises(ClientError) as e:
response = client.list_policies_for_target(
TargetId=ou_id,
Filter='SERVICE_CONTROL_POLICY',
)
ex = e.exception
ex.operation_name.should.equal('ListPoliciesForTarget')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('OrganizationalUnitNotFoundException')
with assert_raises(ClientError) as e:
response = client.list_policies_for_target(
TargetId=account_id,
Filter='SERVICE_CONTROL_POLICY',
)
ex = e.exception
ex.operation_name.should.equal('ListPoliciesForTarget')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('AccountNotFoundException')
with assert_raises(ClientError) as e:
response = client.list_policies_for_target(
TargetId='meaninglessstring',
Filter='SERVICE_CONTROL_POLICY',
)
ex = e.exception
ex.operation_name.should.equal('ListPoliciesForTarget')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('InvalidInputException')
@mock_organizations
def test_list_targets_for_policy():
client = boto3.client('organizations', region_name='us-east-1')
org = client.create_organization(FeatureSet='ALL')['Organization']
root_id = client.list_roots()['Roots'][0]['Id']
ou_id = client.create_organizational_unit(
ParentId=root_id,
Name='ou01',
)['OrganizationalUnit']['Id']
account_id = client.create_account(
AccountName=mockname,
Email=mockemail,
)['CreateAccountStatus']['AccountId']
policy_id = client.create_policy(
Content=json.dumps(policy_doc01),
Description='A dummy service control policy',
Name='MockServiceControlPolicy',
Type='SERVICE_CONTROL_POLICY'
)['Policy']['PolicySummary']['Id']
client.attach_policy(PolicyId=policy_id, TargetId=root_id)
client.attach_policy(PolicyId=policy_id, TargetId=ou_id)
client.attach_policy(PolicyId=policy_id, TargetId=account_id)
response = client.list_targets_for_policy(PolicyId=policy_id)
for target in response['Targets']:
target.should.be.a(dict)
target.should.have.key('Name').should.be.a(six.string_types)
target.should.have.key('Arn').should.be.a(six.string_types)
target.should.have.key('TargetId').should.be.a(six.string_types)
target.should.have.key('Type').should.be.within(
['ROOT', 'ORGANIZATIONAL_UNIT', 'ACCOUNT']
)
@mock_organizations
def test_list_targets_for_policy_exception():
client = boto3.client('organizations', region_name='us-east-1')
client.create_organization(FeatureSet='ALL')['Organization']
policy_id = 'p-47fhe9s3'
with assert_raises(ClientError) as e:
response = client.list_targets_for_policy(PolicyId=policy_id)
ex = e.exception
ex.operation_name.should.equal('ListTargetsForPolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('PolicyNotFoundException')
with assert_raises(ClientError) as e:
response = client.list_targets_for_policy(PolicyId='meaninglessstring')
ex = e.exception
ex.operation_name.should.equal('ListTargetsForPolicy')
ex.response['Error']['Code'].should.equal('400')
ex.response['Error']['Message'].should.contain('InvalidInputException')