SSOAdmin: Add customer managed policy functionality (#7186)

This commit is contained in:
Joel McCoy 2024-01-04 17:05:26 -06:00 committed by GitHub
parent 3ae1b62590
commit c2139a450b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 301 additions and 0 deletions

View File

@ -70,6 +70,7 @@ class PermissionSet(BaseModel):
self.created_date = unix_time()
self.inline_policy = ""
self.managed_policies: List[ManagedPolicy] = list()
self.customer_managed_policies: List[CustomerManagedPolicy] = list()
self.total_managed_policies_attached = (
0 # this will also include customer managed policies
)
@ -107,6 +108,17 @@ class ManagedPolicy(BaseModel):
return self.arn == other.arn
class CustomerManagedPolicy(BaseModel):
def __init__(self, name: str, path: str = "/"):
self.name = name
self.path = path
def __eq__(self, other: Any) -> bool:
if not isinstance(other, CustomerManagedPolicy):
return False
return f"{self.path}{self.name}" == f"{other.path}{other.name}"
class SSOAdminBackend(BaseBackend):
"""Implementation of SSOAdmin APIs."""
@ -424,5 +436,81 @@ class SSOAdminBackend(BaseBackend):
instance_arn, permission_set_arn, managed_policy_arn
)
def attach_customer_managed_policy_reference_to_permission_set(
self,
instance_arn: str,
permission_set_arn: str,
customer_managed_policy_reference: Dict[str, str],
) -> None:
permissionset = self._find_permission_set(
permission_set_arn=permission_set_arn, instance_arn=instance_arn
)
name = customer_managed_policy_reference["Name"]
path = customer_managed_policy_reference.get("Path", "/") # default path is "/"
customer_managed_policy = CustomerManagedPolicy(name=name, path=path)
if customer_managed_policy in permissionset.customer_managed_policies:
raise ConflictException(
f"Given customer managed policy with name: {name} and path {path} already attached"
)
if (
permissionset.total_managed_policies_attached
>= MAX_MANAGED_POLICIES_PER_PERMISSION_SET
):
raise ServiceQuotaExceededException(
f"Cannot attach managed policy: number of attached managed policies is already at maximum {MAX_MANAGED_POLICIES_PER_PERMISSION_SET}"
)
permissionset.customer_managed_policies.append(customer_managed_policy)
permissionset.total_managed_policies_attached += 1
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc]
def list_customer_managed_policy_references_in_permission_set(
self, instance_arn: str, permission_set_arn: str
) -> List[CustomerManagedPolicy]:
permissionset = self._find_permission_set(
permission_set_arn=permission_set_arn, instance_arn=instance_arn
)
return permissionset.customer_managed_policies
def _detach_customer_managed_policy_from_permissionset(
self,
instance_arn: str,
permission_set_arn: str,
customer_managed_policy_reference: Dict[str, str],
) -> None:
permissionset = self._find_permission_set(
permission_set_arn=permission_set_arn, instance_arn=instance_arn
)
path: str = customer_managed_policy_reference.get("Path", "/")
name: str = customer_managed_policy_reference["Name"]
for customer_managed_policy in permissionset.customer_managed_policies:
if (
customer_managed_policy.name == name
and customer_managed_policy.path == path
):
permissionset.customer_managed_policies.remove(customer_managed_policy)
permissionset.total_managed_policies_attached -= 1
return
raise ResourceNotFoundException(
f"Given managed policy with name: {name} and path {path} does not exist on PermissionSet"
)
def detach_customer_managed_policy_reference_from_permission_set(
self,
instance_arn: str,
permission_set_arn: str,
customer_managed_policy_reference: Dict[str, str],
) -> None:
self._detach_customer_managed_policy_from_permissionset(
instance_arn=instance_arn,
permission_set_arn=permission_set_arn,
customer_managed_policy_reference=customer_managed_policy_reference,
)
ssoadmin_backends = BackendDict(SSOAdminBackend, "sso")

View File

@ -244,3 +244,59 @@ class SSOAdminResponse(BaseResponse):
managed_policy_arn=managed_policy_arn,
)
return json.dumps({})
def attach_customer_managed_policy_reference_to_permission_set(self) -> str:
instance_arn = self._get_param("InstanceArn")
permission_set_arn = self._get_param("PermissionSetArn")
customer_managed_policy_reference = self._get_param(
"CustomerManagedPolicyReference"
)
self.ssoadmin_backend.attach_customer_managed_policy_reference_to_permission_set(
instance_arn=instance_arn,
permission_set_arn=permission_set_arn,
customer_managed_policy_reference=customer_managed_policy_reference,
)
return json.dumps({})
def list_customer_managed_policy_references_in_permission_set(self) -> str:
instance_arn = self._get_param("InstanceArn")
permission_set_arn = self._get_param("PermissionSetArn")
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
(
customer_managed_policy_references,
next_token,
) = self.ssoadmin_backend.list_customer_managed_policy_references_in_permission_set(
instance_arn=instance_arn,
permission_set_arn=permission_set_arn,
max_results=max_results,
next_token=next_token,
)
customer_managed_policy_references_response = [
{
"Name": customer_managed_policy_reference.name,
"Path": customer_managed_policy_reference.path,
}
for customer_managed_policy_reference in customer_managed_policy_references
]
return json.dumps(
{
"CustomerManagedPolicyReferences": customer_managed_policy_references_response,
"NextToken": next_token,
}
)
def detach_customer_managed_policy_reference_from_permission_set(self) -> str:
instance_arn = self._get_param("InstanceArn")
permission_set_arn = self._get_param("PermissionSetArn")
customer_managed_policy_reference = self._get_param(
"CustomerManagedPolicyReference"
)
self.ssoadmin_backend.detach_customer_managed_policy_reference_from_permission_set(
instance_arn=instance_arn,
permission_set_arn=permission_set_arn,
customer_managed_policy_reference=customer_managed_policy_reference,
)
return json.dumps({})

View File

@ -37,4 +37,11 @@ PAGINATION_MODEL = {
"result_key": "AttachedManagedPolicies",
"unique_attribute": ["arn"],
},
"list_customer_managed_policy_references_in_permission_set": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"result_key": "CustomerManagedPolicyReferences",
"unique_attribute": ["name", "path"],
},
}

View File

@ -328,3 +328,153 @@ def test_detach_managed_policy_from_permission_set():
)
assert len(response["AttachedManagedPolicies"]) == 0
@mock_ssoadmin
def test_attach_customer_managed_policy_reference_to_permission_set():
client = boto3.client("sso-admin", region_name="us-east-1")
permission_set_arn = create_permissionset(client)
policy_name = "test-policy"
policy_path = "/test-path/"
client.attach_customer_managed_policy_reference_to_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
CustomerManagedPolicyReference={
"Name": policy_name,
"Path": policy_path,
},
)
response = client.list_customer_managed_policy_references_in_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
)
assert len(response["CustomerManagedPolicyReferences"]) == 1
assert response["CustomerManagedPolicyReferences"][0]["Name"] == policy_name
assert response["CustomerManagedPolicyReferences"][0]["Path"] == policy_path
# test for customer managed policy that is already attached
with pytest.raises(ClientError) as e:
client.attach_customer_managed_policy_reference_to_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
CustomerManagedPolicyReference={
"Name": policy_name,
"Path": policy_path,
},
)
err = e.value.response["Error"]
assert err["Code"] == "ConflictException"
assert (
err["Message"]
== f"Given customer managed policy with name: {policy_name} and path {policy_path} already attached"
)
@mock_ssoadmin
def test_list_customer_managed_policy_references_in_permission_set():
"""
Tests listing customer managed policies including pagination.
"""
client = boto3.client("sso-admin", region_name="us-east-1")
permission_set_arn = create_permissionset(client)
policy_name = "test-policy-"
# attach 3 customer managed policies
for idx in range(3):
client.attach_customer_managed_policy_reference_to_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
CustomerManagedPolicyReference={"Name": f"{policy_name}{idx}"},
)
response = client.list_customer_managed_policy_references_in_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
MaxResults=2,
)
customer_managed_policy_names = []
assert len(response["CustomerManagedPolicyReferences"]) == 2
next_token = response["NextToken"]
for name in response["CustomerManagedPolicyReferences"]:
customer_managed_policy_names.append(name["Name"])
response = client.list_customer_managed_policy_references_in_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
MaxResults=2,
NextToken=next_token,
)
for name in response["CustomerManagedPolicyReferences"]:
customer_managed_policy_names.append(name["Name"])
assert len(response["CustomerManagedPolicyReferences"]) == 1
# ensure the 3 unique customer managed policies were returned
assert len(set(customer_managed_policy_names)) == 3
@mock_ssoadmin
def test_detach_customer_managed_policy_reference_from_permission_set():
client = boto3.client("sso-admin", region_name="us-east-1")
permission_set_arn = create_permissionset(client)
# trying to detach a policy that doesn't exist yet
with pytest.raises(ClientError) as e:
client.detach_customer_managed_policy_reference_from_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
CustomerManagedPolicyReference={
"Name": "test-policy",
},
)
err = e.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
assert (
err["Message"]
== "Given managed policy with name: test-policy and path / does not exist on PermissionSet"
)
# attach a policy
client.attach_customer_managed_policy_reference_to_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
CustomerManagedPolicyReference={
"Name": "test-policy",
"Path": "/some-path/",
},
)
# try to detach the policy but default path (should fail)
with pytest.raises(ClientError) as e:
client.detach_customer_managed_policy_reference_from_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
CustomerManagedPolicyReference={
"Name": "test-policy",
},
)
# detach the policy
client.detach_customer_managed_policy_reference_from_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
CustomerManagedPolicyReference={
"Name": "test-policy",
"Path": "/some-path/",
},
)
# ensure policy is detached
response = client.list_customer_managed_policy_references_in_permission_set(
InstanceArn=DUMMY_INSTANCE_ARN,
PermissionSetArn=permission_set_arn,
)
assert len(response["CustomerManagedPolicyReferences"]) == 0