SSO-Admin - Initial implementation (#4743)

This commit is contained in:
Bert Blommers 2022-01-07 15:28:29 -01:00 committed by GitHub
parent 526559e22c
commit a5173904c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 448 additions and 0 deletions

View File

@ -142,6 +142,7 @@ mock_sns_deprecated = lazy_load(".sns", "mock_sns_deprecated")
mock_sqs = lazy_load(".sqs", "mock_sqs")
mock_sqs_deprecated = lazy_load(".sqs", "mock_sqs_deprecated")
mock_ssm = lazy_load(".ssm", "mock_ssm")
mock_ssoadmin = lazy_load(".ssoadmin", "mock_ssoadmin", boto3_name="sso-admin")
mock_stepfunctions = lazy_load(
".stepfunctions", "mock_stepfunctions", backend="stepfunction_backends"
)

View File

@ -123,6 +123,7 @@ backend_url_patterns = [
("sqs", re.compile("https?://(.*\\.)?(queue|sqs)\\.(.*\\.)?amazonaws\\.com")),
("ssm", re.compile("https?://ssm\\.(.+)\\.amazonaws\\.com")),
("ssm", re.compile("https?://ssm\\.(.+)\\.amazonaws\\.com\\.cn")),
("sso-admin", re.compile("https?://sso\\.(.+)\\.amazonaws\\.com")),
("stepfunctions", re.compile("https?://states\\.(.+)\\.amazonaws.com")),
("sts", re.compile("https?://sts\\.(.*\\.)?amazonaws\\.com")),
("support", re.compile("https?://support\\.(.+)\\.amazonaws\\.com")),

View File

@ -0,0 +1,5 @@
"""ssoadmin module initialization; sets value for base decorator."""
from .models import ssoadmin_backends
from ..core.models import base_decorator
mock_ssoadmin = base_decorator(ssoadmin_backends)

View File

@ -0,0 +1,7 @@
"""Exceptions raised by the ssoadmin service."""
from moto.core.exceptions import JsonRESTError
class ResourceNotFound(JsonRESTError):
def __init__(self):
super().__init__("ResourceNotFound", "Account not found")

142
moto/ssoadmin/models.py Normal file
View File

@ -0,0 +1,142 @@
from .exceptions import ResourceNotFound
from moto.core import BaseBackend, BaseModel
from moto.core.utils import BackendDict, unix_time
from uuid import uuid4
class AccountAssignment(BaseModel):
def __init__(
self,
instance_arn,
target_id,
target_type,
permission_set_arn,
principal_type,
principal_id,
):
self.request_id = str(uuid4())
self.instance_arn = instance_arn
self.target_id = target_id
self.target_type = target_type
self.permission_set_arn = permission_set_arn
self.principal_type = principal_type
self.principal_id = principal_id
self.created_date = unix_time()
def to_json(self, include_creation_date=False):
summary = {
"TargetId": self.target_id,
"TargetType": self.target_type,
"PermissionSetArn": self.permission_set_arn,
"PrincipalType": self.principal_type,
"PrincipalId": self.principal_id,
}
if include_creation_date:
summary["CreatedDate"] = self.created_date
return summary
class SSOAdminBackend(BaseBackend):
"""Implementation of SSOAdmin APIs."""
def __init__(self, region_name=None):
self.region_name = region_name
self.account_assignments = list()
def reset(self):
"""Re-initialize all attributes for this instance."""
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_account_assignment(
self,
instance_arn,
target_id,
target_type,
permission_set_arn,
principal_type,
principal_id,
):
assignment = AccountAssignment(
instance_arn,
target_id,
target_type,
permission_set_arn,
principal_type,
principal_id,
)
self.account_assignments.append(assignment)
return assignment.to_json()
def delete_account_assignment(
self,
instance_arn,
target_id,
target_type,
permission_set_arn,
principal_type,
principal_id,
):
account = self._find_account(
instance_arn,
target_id,
target_type,
permission_set_arn,
principal_type,
principal_id,
)
self.account_assignments.remove(account)
return account.to_json(include_creation_date=True)
def _find_account(
self,
instance_arn,
target_id,
target_type,
permission_set_arn,
principal_type,
principal_id,
):
for account in self.account_assignments:
instance_arn_match = account.instance_arn == instance_arn
target_id_match = account.target_id == target_id
target_type_match = account.target_type == target_type
permission_set_match = account.permission_set_arn == permission_set_arn
principal_type_match = account.principal_type == principal_type
principal_id_match = account.principal_id == principal_id
if (
instance_arn_match
and target_id_match
and target_type_match
and permission_set_match
and principal_type_match
and principal_id_match
):
return account
raise ResourceNotFound
def list_account_assignments(self, instance_arn, account_id, permission_set_arn):
"""
Pagination has not yet been implemented
"""
account_assignments = []
for assignment in self.account_assignments:
if (
assignment.instance_arn == instance_arn
and assignment.target_id == account_id
and assignment.permission_set_arn == permission_set_arn
):
account_assignments.append(
{
"AccountId": account_id,
"PermissionSetArn": assignment.permission_set_arn,
"PrincipalType": assignment.principal_type,
"PrincipalId": assignment.principal_id,
}
)
return account_assignments
ssoadmin_backends = BackendDict(SSOAdminBackend, "sso")

View File

@ -0,0 +1,67 @@
import json
from moto.core.responses import BaseResponse
from uuid import uuid4
from .models import ssoadmin_backends
class SSOAdminResponse(BaseResponse):
"""Handler for SSOAdmin requests and responses."""
@property
def ssoadmin_backend(self):
"""Return backend instance specific for this region."""
return ssoadmin_backends[self.region]
def create_account_assignment(self):
params = json.loads(self.body)
instance_arn = params.get("InstanceArn")
target_id = params.get("TargetId")
target_type = params.get("TargetType")
permission_set_arn = params.get("PermissionSetArn")
principal_type = params.get("PrincipalType")
principal_id = params.get("PrincipalId")
summary = self.ssoadmin_backend.create_account_assignment(
instance_arn=instance_arn,
target_id=target_id,
target_type=target_type,
permission_set_arn=permission_set_arn,
principal_type=principal_type,
principal_id=principal_id,
)
summary["Status"] = "SUCCEEDED"
summary["RequestId"] = str(uuid4())
return json.dumps({"AccountAssignmentCreationStatus": summary})
def delete_account_assignment(self):
params = json.loads(self.body)
instance_arn = params.get("InstanceArn")
target_id = params.get("TargetId")
target_type = params.get("TargetType")
permission_set_arn = params.get("PermissionSetArn")
principal_type = params.get("PrincipalType")
principal_id = params.get("PrincipalId")
summary = self.ssoadmin_backend.delete_account_assignment(
instance_arn=instance_arn,
target_id=target_id,
target_type=target_type,
permission_set_arn=permission_set_arn,
principal_type=principal_type,
principal_id=principal_id,
)
summary["Status"] = "SUCCEEDED"
summary["RequestId"] = str(uuid4())
return json.dumps({"AccountAssignmentDeletionStatus": summary})
def list_account_assignments(self):
params = json.loads(self.body)
instance_arn = params.get("InstanceArn")
account_id = params.get("AccountId")
permission_set_arn = params.get("PermissionSetArn")
assignments = self.ssoadmin_backend.list_account_assignments(
instance_arn=instance_arn,
account_id=account_id,
permission_set_arn=permission_set_arn,
)
return json.dumps({"AccountAssignments": assignments})

11
moto/ssoadmin/urls.py Normal file
View File

@ -0,0 +1,11 @@
"""ssoadmin base URL and path."""
from .responses import SSOAdminResponse
url_bases = [
r"https?://sso\.(.+)\.amazonaws\.com",
]
url_paths = {
"{0}/$": SSOAdminResponse.dispatch,
}

View File

View File

@ -0,0 +1,24 @@
import json
import sure # noqa # pylint: disable=unused-import
import moto.server as server
def test_ssoadmin_list():
backend = server.create_backend_app("sso-admin")
test_client = backend.test_client()
headers = {
"X-Amz-Target": "SWBExternalService.ListAccountAssignments",
"User-Agent": "aws-cli/2.2.47 Python/3.8.8 Linux/5.11.0-44-generic exe/x86_64.ubuntu.20 prompt/off command/sso-admin.list-account-assignments",
}
data = {
"InstanceArn": "arn:aws:sso:::instance/ins-aaaabbbbccccdddd",
"AccountId": "222222222222",
"PermissionSetArn": "arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo",
}
resp = test_client.post("/", headers=headers, data=json.dumps(data))
resp.status_code.should.equal(200)
json.loads(resp.data).should.equal({"AccountAssignments": []})

View File

@ -0,0 +1,190 @@
import boto3
import datetime
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_ssoadmin
from uuid import uuid4
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_ssoadmin
def test_create_account_assignment():
client = boto3.client("sso-admin", region_name="eu-west-1")
target_id = "222222222222"
permission_set_arn = (
"arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo"
)
principal_id = str(uuid4())
resp = client.create_account_assignment(
InstanceArn="arn:aws:sso:::instance/ins-aaaabbbbccccdddd",
TargetId=target_id,
TargetType="AWS_ACCOUNT",
PermissionSetArn=permission_set_arn,
PrincipalType="USER",
PrincipalId=principal_id,
)
resp.should.have.key("AccountAssignmentCreationStatus")
status = resp["AccountAssignmentCreationStatus"]
status.should.have.key("Status").equals("SUCCEEDED")
status.should.have.key("RequestId")
status.shouldnt.have.key("FailureReason")
status.should.have.key("TargetId").equals(target_id)
status.should.have.key("TargetType").equals("AWS_ACCOUNT")
status.should.have.key("PermissionSetArn").equals(permission_set_arn)
status.should.have.key("PrincipalType").equals("USER")
status.should.have.key("PrincipalId").equals(principal_id)
@mock_ssoadmin
def test_delete_account_assignment():
client = boto3.client("sso-admin", region_name="eu-west-1")
target_id = "222222222222"
permission_set_arn = (
"arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo"
)
principal_id = str(uuid4())
instance_arn = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd"
client.create_account_assignment(
InstanceArn=instance_arn,
TargetId=target_id,
TargetType="AWS_ACCOUNT",
PermissionSetArn=permission_set_arn,
PrincipalType="USER",
PrincipalId=principal_id,
)
resp = client.delete_account_assignment(
InstanceArn=instance_arn,
TargetId=target_id,
TargetType="AWS_ACCOUNT",
PermissionSetArn=permission_set_arn,
PrincipalType="USER",
PrincipalId=principal_id,
)
resp.should.have.key("AccountAssignmentDeletionStatus")
# Verify the correct response
status = resp["AccountAssignmentDeletionStatus"]
status.should.have.key("Status").equals("SUCCEEDED")
status.should.have.key("RequestId")
status.shouldnt.have.key("FailureReason")
status.should.have.key("TargetId").equals(target_id)
status.should.have.key("TargetType").equals("AWS_ACCOUNT")
status.should.have.key("PermissionSetArn").equals(permission_set_arn)
status.should.have.key("PrincipalType").equals("USER")
status.should.have.key("PrincipalId").equals(principal_id)
status.should.have.key("CreatedDate").should.be.a(datetime.datetime)
# Verify this account assignment can no longer be found
resp = client.list_account_assignments(
InstanceArn=instance_arn,
AccountId=target_id,
PermissionSetArn=permission_set_arn,
)
resp.should.have.key("AccountAssignments").equals([])
@mock_ssoadmin
def test_delete_account_assignment_unknown():
client = boto3.client("sso-admin", region_name="us-east-1")
target_id = "222222222222"
permission_set_arn = (
"arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo"
)
principal_id = str(uuid4())
instance_arn = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd"
with pytest.raises(ClientError) as exc:
client.delete_account_assignment(
InstanceArn=instance_arn,
TargetId=target_id,
TargetType="AWS_ACCOUNT",
PermissionSetArn=permission_set_arn,
PrincipalType="USER",
PrincipalId=principal_id,
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFound")
@mock_ssoadmin
def test_list_account_assignments():
client = boto3.client("sso-admin", region_name="ap-southeast-1")
target_id1 = "222222222222"
target_id2 = "333333333333"
permission_set_arn = (
"arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo"
)
principal_id = str(uuid4())
instance_arn = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd"
resp = client.list_account_assignments(
InstanceArn=instance_arn,
AccountId=target_id1,
PermissionSetArn=permission_set_arn,
)
resp.should.have.key("AccountAssignments").equals([])
client.create_account_assignment(
InstanceArn=instance_arn,
TargetId=target_id1,
TargetType="AWS_ACCOUNT",
PermissionSetArn=permission_set_arn,
PrincipalType="USER",
PrincipalId=principal_id,
)
resp = client.list_account_assignments(
InstanceArn=instance_arn,
AccountId=target_id1,
PermissionSetArn=permission_set_arn,
)
resp.should.have.key("AccountAssignments").equals(
[
{
"AccountId": target_id1,
"PermissionSetArn": permission_set_arn,
"PrincipalType": "USER",
"PrincipalId": principal_id,
}
]
)
client.create_account_assignment(
InstanceArn=instance_arn,
TargetId=target_id2,
TargetType="AWS_ACCOUNT",
PermissionSetArn=permission_set_arn,
PrincipalType="USER",
PrincipalId=principal_id,
)
resp = client.list_account_assignments(
InstanceArn=instance_arn,
AccountId=target_id2,
PermissionSetArn=permission_set_arn,
)
resp.should.have.key("AccountAssignments").equals(
[
{
"AccountId": target_id2,
"PermissionSetArn": permission_set_arn,
"PrincipalType": "USER",
"PrincipalId": principal_id,
}
]
)