SSO-admin: Add basic inline policy crud operations (#7182)
This commit is contained in:
parent
c436bb93ef
commit
5fadeb869e
@ -2,6 +2,10 @@
|
|||||||
from moto.core.exceptions import JsonRESTError
|
from moto.core.exceptions import JsonRESTError
|
||||||
|
|
||||||
|
|
||||||
class ResourceNotFound(JsonRESTError):
|
class ResourceNotFoundException(JsonRESTError):
|
||||||
def __init__(self) -> None:
|
def __init__(self, message: str = "Account not found") -> None:
|
||||||
super().__init__("ResourceNotFound", "Account not found")
|
super().__init__(
|
||||||
|
error_type="ResourceNotFoundException",
|
||||||
|
message=message,
|
||||||
|
code="ResourceNotFoundException",
|
||||||
|
)
|
||||||
|
@ -5,7 +5,7 @@ from moto.core.utils import unix_time
|
|||||||
from moto.moto_api._internal import mock_random as random
|
from moto.moto_api._internal import mock_random as random
|
||||||
from moto.utilities.paginator import paginate
|
from moto.utilities.paginator import paginate
|
||||||
|
|
||||||
from .exceptions import ResourceNotFound
|
from .exceptions import ResourceNotFoundException
|
||||||
from .utils import PAGINATION_MODEL
|
from .utils import PAGINATION_MODEL
|
||||||
|
|
||||||
|
|
||||||
@ -59,6 +59,7 @@ class PermissionSet(BaseModel):
|
|||||||
self.relay_state = relay_state
|
self.relay_state = relay_state
|
||||||
self.tags = tags
|
self.tags = tags
|
||||||
self.created_date = unix_time()
|
self.created_date = unix_time()
|
||||||
|
self.inline_policy = ""
|
||||||
|
|
||||||
def to_json(self, include_creation_date: bool = False) -> Dict[str, Any]:
|
def to_json(self, include_creation_date: bool = False) -> Dict[str, Any]:
|
||||||
summary: Dict[str, Any] = {
|
summary: Dict[str, Any] = {
|
||||||
@ -155,7 +156,7 @@ class SSOAdminBackend(BaseBackend):
|
|||||||
and principal_id_match
|
and principal_id_match
|
||||||
):
|
):
|
||||||
return account
|
return account
|
||||||
raise ResourceNotFound
|
raise ResourceNotFoundException
|
||||||
|
|
||||||
@paginate(PAGINATION_MODEL) # type: ignore[misc]
|
@paginate(PAGINATION_MODEL) # type: ignore[misc]
|
||||||
def list_account_assignments(
|
def list_account_assignments(
|
||||||
@ -273,7 +274,10 @@ class SSOAdminBackend(BaseBackend):
|
|||||||
)
|
)
|
||||||
if instance_arn_match and permission_set_match:
|
if instance_arn_match and permission_set_match:
|
||||||
return permission_set
|
return permission_set
|
||||||
raise ResourceNotFound
|
ps_id = permission_set_arn.split("/")[-1]
|
||||||
|
raise ResourceNotFoundException(
|
||||||
|
message=f"Could not find PermissionSet with id {ps_id}"
|
||||||
|
)
|
||||||
|
|
||||||
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc]
|
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc]
|
||||||
def list_permission_sets(self, instance_arn: str) -> List[PermissionSet]:
|
def list_permission_sets(self, instance_arn: str) -> List[PermissionSet]:
|
||||||
@ -283,5 +287,32 @@ class SSOAdminBackend(BaseBackend):
|
|||||||
permission_sets.append(permission_set)
|
permission_sets.append(permission_set)
|
||||||
return permission_sets
|
return permission_sets
|
||||||
|
|
||||||
|
def put_inline_policy_to_permission_set(
|
||||||
|
self, instance_arn: str, permission_set_arn: str, inline_policy: str
|
||||||
|
) -> None:
|
||||||
|
permission_set = self._find_permission_set(
|
||||||
|
instance_arn,
|
||||||
|
permission_set_arn,
|
||||||
|
)
|
||||||
|
permission_set.inline_policy = inline_policy
|
||||||
|
|
||||||
|
def get_inline_policy_for_permission_set(
|
||||||
|
self, instance_arn: str, permission_set_arn: str
|
||||||
|
) -> str:
|
||||||
|
permission_set = self._find_permission_set(
|
||||||
|
instance_arn,
|
||||||
|
permission_set_arn,
|
||||||
|
)
|
||||||
|
return permission_set.inline_policy
|
||||||
|
|
||||||
|
def delete_inline_policy_from_permission_set(
|
||||||
|
self, instance_arn: str, permission_set_arn: str
|
||||||
|
) -> None:
|
||||||
|
permission_set = self._find_permission_set(
|
||||||
|
instance_arn,
|
||||||
|
permission_set_arn,
|
||||||
|
)
|
||||||
|
permission_set.inline_policy = ""
|
||||||
|
|
||||||
|
|
||||||
ssoadmin_backends = BackendDict(SSOAdminBackend, "sso")
|
ssoadmin_backends = BackendDict(SSOAdminBackend, "sso")
|
||||||
|
@ -166,3 +166,32 @@ class SSOAdminResponse(BaseResponse):
|
|||||||
if next_token:
|
if next_token:
|
||||||
response["NextToken"] = next_token
|
response["NextToken"] = next_token
|
||||||
return json.dumps(response)
|
return json.dumps(response)
|
||||||
|
|
||||||
|
def put_inline_policy_to_permission_set(self) -> str:
|
||||||
|
instance_arn = self._get_param("InstanceArn")
|
||||||
|
permission_set_arn = self._get_param("PermissionSetArn")
|
||||||
|
inline_policy = self._get_param("InlinePolicy")
|
||||||
|
self.ssoadmin_backend.put_inline_policy_to_permission_set(
|
||||||
|
instance_arn=instance_arn,
|
||||||
|
permission_set_arn=permission_set_arn,
|
||||||
|
inline_policy=inline_policy,
|
||||||
|
)
|
||||||
|
return json.dumps({})
|
||||||
|
|
||||||
|
def get_inline_policy_for_permission_set(self) -> str:
|
||||||
|
instance_arn = self._get_param("InstanceArn")
|
||||||
|
permission_set_arn = self._get_param("PermissionSetArn")
|
||||||
|
inline_policy = self.ssoadmin_backend.get_inline_policy_for_permission_set(
|
||||||
|
instance_arn=instance_arn,
|
||||||
|
permission_set_arn=permission_set_arn,
|
||||||
|
)
|
||||||
|
return json.dumps({"InlinePolicy": inline_policy})
|
||||||
|
|
||||||
|
def delete_inline_policy_from_permission_set(self) -> str:
|
||||||
|
instance_arn = self._get_param("InstanceArn")
|
||||||
|
permission_set_arn = self._get_param("PermissionSetArn")
|
||||||
|
self.ssoadmin_backend.delete_inline_policy_from_permission_set(
|
||||||
|
instance_arn=instance_arn,
|
||||||
|
permission_set_arn=permission_set_arn,
|
||||||
|
)
|
||||||
|
return json.dumps({})
|
||||||
|
@ -118,7 +118,7 @@ def test_delete_account_assignment_unknown():
|
|||||||
PrincipalId=principal_id,
|
PrincipalId=principal_id,
|
||||||
)
|
)
|
||||||
err = exc.value.response["Error"]
|
err = exc.value.response["Error"]
|
||||||
assert err["Code"] == "ResourceNotFound"
|
assert err["Code"] == "ResourceNotFoundException"
|
||||||
|
|
||||||
|
|
||||||
@mock_ssoadmin
|
@mock_ssoadmin
|
||||||
@ -451,7 +451,7 @@ def test_update_permission_set_unknown():
|
|||||||
RelayState="https://console.aws.amazon.com/s3",
|
RelayState="https://console.aws.amazon.com/s3",
|
||||||
)
|
)
|
||||||
err = exc.value.response["Error"]
|
err = exc.value.response["Error"]
|
||||||
assert err["Code"] == "ResourceNotFound"
|
assert err["Code"] == "ResourceNotFoundException"
|
||||||
|
|
||||||
|
|
||||||
@mock_ssoadmin
|
@mock_ssoadmin
|
||||||
@ -488,7 +488,7 @@ def test_describe_permission_set_unknown():
|
|||||||
PermissionSetArn="arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo",
|
PermissionSetArn="arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo",
|
||||||
)
|
)
|
||||||
err = exc.value.response["Error"]
|
err = exc.value.response["Error"]
|
||||||
assert err["Code"] == "ResourceNotFound"
|
assert err["Code"] == "ResourceNotFoundException"
|
||||||
|
|
||||||
|
|
||||||
@mock_ssoadmin
|
@mock_ssoadmin
|
||||||
@ -511,7 +511,7 @@ def test_delete_permission_set():
|
|||||||
PermissionSetArn=permission_set["PermissionSetArn"],
|
PermissionSetArn=permission_set["PermissionSetArn"],
|
||||||
)
|
)
|
||||||
err = exc.value.response["Error"]
|
err = exc.value.response["Error"]
|
||||||
assert err["Code"] == "ResourceNotFound"
|
assert err["Code"] == "ResourceNotFoundException"
|
||||||
|
|
||||||
|
|
||||||
@mock_ssoadmin
|
@mock_ssoadmin
|
||||||
@ -524,7 +524,7 @@ def test_delete_permission_set_unknown():
|
|||||||
PermissionSetArn="arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo",
|
PermissionSetArn="arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo",
|
||||||
)
|
)
|
||||||
err = exc.value.response["Error"]
|
err = exc.value.response["Error"]
|
||||||
assert err["Code"] == "ResourceNotFound"
|
assert err["Code"] == "ResourceNotFoundException"
|
||||||
|
|
||||||
|
|
||||||
@mock_ssoadmin
|
@mock_ssoadmin
|
||||||
|
132
tests/test_ssoadmin/test_ssoadmin_policies.py
Normal file
132
tests/test_ssoadmin/test_ssoadmin_policies.py
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
import boto3
|
||||||
|
import pytest
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
|
||||||
|
from moto import mock_ssoadmin
|
||||||
|
|
||||||
|
# See our Development Tips on writing tests for hints on how to write good tests:
|
||||||
|
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
|
||||||
|
|
||||||
|
DUMMY_PERMISSIONSET_ID = (
|
||||||
|
"arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo"
|
||||||
|
)
|
||||||
|
DUMMY_INSTANCE_ARN = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd"
|
||||||
|
|
||||||
|
|
||||||
|
def create_permissionset(client) -> str:
|
||||||
|
"""Helper function to create a dummy permission set and returns the arn."""
|
||||||
|
|
||||||
|
response = client.create_permission_set(
|
||||||
|
Name="test-permission-set",
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
Description="test permission set",
|
||||||
|
)
|
||||||
|
|
||||||
|
return response["PermissionSet"]["PermissionSetArn"]
|
||||||
|
|
||||||
|
|
||||||
|
@mock_ssoadmin
|
||||||
|
def test_put_inline_policy_to_permission_set():
|
||||||
|
"""
|
||||||
|
Tests putting and getting an inline policy to a permission set.
|
||||||
|
"""
|
||||||
|
client = boto3.client("sso-admin", region_name="us-east-1")
|
||||||
|
|
||||||
|
permission_set_arn = create_permissionset(client)
|
||||||
|
dummy_policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": "s3:GetObject",
|
||||||
|
"Resource": "arn:aws:s3:::your-bucket-name/*",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Happy path
|
||||||
|
response = client.put_inline_policy_to_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=permission_set_arn,
|
||||||
|
InlinePolicy=json.dumps(dummy_policy),
|
||||||
|
)
|
||||||
|
|
||||||
|
response = client.get_inline_policy_for_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=permission_set_arn,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response["InlinePolicy"] == json.dumps(dummy_policy)
|
||||||
|
|
||||||
|
# Invalid permission set arn
|
||||||
|
not_create_ps_arn = (
|
||||||
|
"arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoxyz"
|
||||||
|
)
|
||||||
|
with pytest.raises(ClientError) as e:
|
||||||
|
client.put_inline_policy_to_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=not_create_ps_arn,
|
||||||
|
InlinePolicy=json.dumps(dummy_policy),
|
||||||
|
)
|
||||||
|
err = e.value.response["Error"]
|
||||||
|
assert err["Code"] == "ResourceNotFoundException"
|
||||||
|
assert err["Message"] == "Could not find PermissionSet with id ps-hhhhkkkkppppoxyz"
|
||||||
|
|
||||||
|
|
||||||
|
@mock_ssoadmin
|
||||||
|
def test_get_inline_policy_to_permission_set_no_policy():
|
||||||
|
client = boto3.client("sso-admin", region_name="us-east-1")
|
||||||
|
|
||||||
|
permission_set_arn = create_permissionset(client)
|
||||||
|
|
||||||
|
response = client.get_inline_policy_for_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=permission_set_arn,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response["InlinePolicy"] == ""
|
||||||
|
|
||||||
|
|
||||||
|
@mock_ssoadmin
|
||||||
|
def test_delete_inline_policy_to_permissionset():
|
||||||
|
client = boto3.client("sso-admin", region_name="us-east-1")
|
||||||
|
|
||||||
|
permission_set_arn = create_permissionset(client)
|
||||||
|
|
||||||
|
dummy_policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": "s3:GetObject",
|
||||||
|
"Resource": "arn:aws:s3:::your-bucket-name/*",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
client.put_inline_policy_to_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=permission_set_arn,
|
||||||
|
InlinePolicy=json.dumps(dummy_policy),
|
||||||
|
)
|
||||||
|
|
||||||
|
response = client.get_inline_policy_for_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=permission_set_arn,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response["InlinePolicy"] == json.dumps(dummy_policy)
|
||||||
|
|
||||||
|
client.delete_inline_policy_from_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=permission_set_arn,
|
||||||
|
)
|
||||||
|
|
||||||
|
response = client.get_inline_policy_for_permission_set(
|
||||||
|
InstanceArn=DUMMY_INSTANCE_ARN,
|
||||||
|
PermissionSetArn=permission_set_arn,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response["InlinePolicy"] == ""
|
Loading…
Reference in New Issue
Block a user