add model methods for iam attached group policies

This commit is contained in:
Jack Danger 2017-10-01 15:01:33 -07:00
parent 5bb6b98f6d
commit 37ae61871c

View File

@ -82,6 +82,10 @@ class ManagedPolicy(Policy):
self.attachment_count -= 1 self.attachment_count -= 1
del role.managed_policies[self.name] del role.managed_policies[self.name]
def attach_to_group(self, group):
self.attachment_count += 1
group.managed_policies[self.name] = self
def attach_to_user(self, user): def attach_to_user(self, user):
self.attachment_count += 1 self.attachment_count += 1
user.managed_policies[self.name] = self user.managed_policies[self.name] = self
@ -249,6 +253,7 @@ class Group(BaseModel):
) )
self.users = [] self.users = []
self.managed_policies = {}
self.policies = {} self.policies = {}
def get_cfn_attribute(self, attribute_name): def get_cfn_attribute(self, attribute_name):
@ -433,14 +438,36 @@ class IAMBackend(BaseBackend):
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn)) raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
def attach_group_policy(self, policy_arn, group_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.attach_to_group(self.get_group(group_name))
def detach_group_policy(self, policy_arn, group_name):
arns = dict((p.arn, p) for p in self.managed_policies.values())
try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.detach_from_group(self.get_group(group_name))
def attach_user_policy(self, policy_arn, user_name): def attach_user_policy(self, policy_arn, user_name):
arns = dict((p.arn, p) for p in self.managed_policies.values()) arns = dict((p.arn, p) for p in self.managed_policies.values())
policy = arns[policy_arn] try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.attach_to_user(self.get_user(user_name)) policy.attach_to_user(self.get_user(user_name))
def detach_user_policy(self, policy_arn, user_name): def detach_user_policy(self, policy_arn, user_name):
arns = dict((p.arn, p) for p in self.managed_policies.values()) arns = dict((p.arn, p) for p in self.managed_policies.values())
policy = arns[policy_arn] try:
policy = arns[policy_arn]
except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn))
policy.detach_from_user(self.get_user(user_name)) policy.detach_from_user(self.get_user(user_name))
def create_policy(self, description, path, policy_document, policy_name): def create_policy(self, description, path, policy_document, policy_name):
@ -458,39 +485,15 @@ class IAMBackend(BaseBackend):
def list_attached_role_policies(self, role_name, marker=None, max_items=100, path_prefix='/'): def list_attached_role_policies(self, role_name, marker=None, max_items=100, path_prefix='/'):
policies = self.get_role(role_name).managed_policies.values() policies = self.get_role(role_name).managed_policies.values()
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
if path_prefix: def list_attached_group_policies(self, group_name, marker=None, max_items=100, path_prefix='/'):
policies = [p for p in policies if p.path.startswith(path_prefix)] policies = self.get_group(group_name).managed_policies.values()
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
policies = sorted(policies, key=lambda policy: policy.name)
start_idx = int(marker) if marker else 0
policies = policies[start_idx:start_idx + max_items]
if len(policies) < max_items:
marker = None
else:
marker = str(start_idx + max_items)
return policies, marker
def list_attached_user_policies(self, user_name, marker=None, max_items=100, path_prefix='/'): def list_attached_user_policies(self, user_name, marker=None, max_items=100, path_prefix='/'):
policies = self.get_user(user_name).managed_policies.values() policies = self.get_user(user_name).managed_policies.values()
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
if path_prefix:
policies = [p for p in policies if p.path.startswith(path_prefix)]
policies = sorted(policies, key=lambda policy: policy.name)
start_idx = int(marker) if marker else 0
policies = policies[start_idx:start_idx + max_items]
if len(policies) < max_items:
marker = None
else:
marker = str(start_idx + max_items)
return policies, marker
def list_policies(self, marker, max_items, only_attached, path_prefix, scope): def list_policies(self, marker, max_items, only_attached, path_prefix, scope):
policies = self.managed_policies.values() policies = self.managed_policies.values()
@ -504,6 +507,9 @@ class IAMBackend(BaseBackend):
policies = [p for p in policies if not isinstance( policies = [p for p in policies if not isinstance(
p, AWSManagedPolicy)] p, AWSManagedPolicy)]
return self._filter_attached_policies(policies, marker, max_items, path_prefix)
def _filter_attached_policies(self, policies, marker, max_items, path_prefix):
if path_prefix: if path_prefix:
policies = [p for p in policies if p.path.startswith(path_prefix)] policies = [p for p in policies if p.path.startswith(path_prefix)]