diff --git a/moto/__init__.py b/moto/__init__.py index d09b8949c..e2e2277fd 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -142,6 +142,7 @@ mock_sns_deprecated = lazy_load(".sns", "mock_sns_deprecated") mock_sqs = lazy_load(".sqs", "mock_sqs") mock_sqs_deprecated = lazy_load(".sqs", "mock_sqs_deprecated") mock_ssm = lazy_load(".ssm", "mock_ssm") +mock_ssoadmin = lazy_load(".ssoadmin", "mock_ssoadmin", boto3_name="sso-admin") mock_stepfunctions = lazy_load( ".stepfunctions", "mock_stepfunctions", backend="stepfunction_backends" ) diff --git a/moto/backend_index.py b/moto/backend_index.py index a37aee7cd..82749d233 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -123,6 +123,7 @@ backend_url_patterns = [ ("sqs", re.compile("https?://(.*\\.)?(queue|sqs)\\.(.*\\.)?amazonaws\\.com")), ("ssm", re.compile("https?://ssm\\.(.+)\\.amazonaws\\.com")), ("ssm", re.compile("https?://ssm\\.(.+)\\.amazonaws\\.com\\.cn")), + ("sso-admin", re.compile("https?://sso\\.(.+)\\.amazonaws\\.com")), ("stepfunctions", re.compile("https?://states\\.(.+)\\.amazonaws.com")), ("sts", re.compile("https?://sts\\.(.*\\.)?amazonaws\\.com")), ("support", re.compile("https?://support\\.(.+)\\.amazonaws\\.com")), diff --git a/moto/ssoadmin/__init__.py b/moto/ssoadmin/__init__.py new file mode 100644 index 000000000..a6c002391 --- /dev/null +++ b/moto/ssoadmin/__init__.py @@ -0,0 +1,5 @@ +"""ssoadmin module initialization; sets value for base decorator.""" +from .models import ssoadmin_backends +from ..core.models import base_decorator + +mock_ssoadmin = base_decorator(ssoadmin_backends) diff --git a/moto/ssoadmin/exceptions.py b/moto/ssoadmin/exceptions.py new file mode 100644 index 000000000..a133e1d5e --- /dev/null +++ b/moto/ssoadmin/exceptions.py @@ -0,0 +1,7 @@ +"""Exceptions raised by the ssoadmin service.""" +from moto.core.exceptions import JsonRESTError + + +class ResourceNotFound(JsonRESTError): + def __init__(self): + super().__init__("ResourceNotFound", "Account not found") diff --git a/moto/ssoadmin/models.py b/moto/ssoadmin/models.py new file mode 100644 index 000000000..1f7d79037 --- /dev/null +++ b/moto/ssoadmin/models.py @@ -0,0 +1,142 @@ +from .exceptions import ResourceNotFound + +from moto.core import BaseBackend, BaseModel +from moto.core.utils import BackendDict, unix_time +from uuid import uuid4 + + +class AccountAssignment(BaseModel): + def __init__( + self, + instance_arn, + target_id, + target_type, + permission_set_arn, + principal_type, + principal_id, + ): + self.request_id = str(uuid4()) + self.instance_arn = instance_arn + self.target_id = target_id + self.target_type = target_type + self.permission_set_arn = permission_set_arn + self.principal_type = principal_type + self.principal_id = principal_id + self.created_date = unix_time() + + def to_json(self, include_creation_date=False): + summary = { + "TargetId": self.target_id, + "TargetType": self.target_type, + "PermissionSetArn": self.permission_set_arn, + "PrincipalType": self.principal_type, + "PrincipalId": self.principal_id, + } + if include_creation_date: + summary["CreatedDate"] = self.created_date + return summary + + +class SSOAdminBackend(BaseBackend): + """Implementation of SSOAdmin APIs.""" + + def __init__(self, region_name=None): + self.region_name = region_name + self.account_assignments = list() + + def reset(self): + """Re-initialize all attributes for this instance.""" + region_name = self.region_name + self.__dict__ = {} + self.__init__(region_name) + + def create_account_assignment( + self, + instance_arn, + target_id, + target_type, + permission_set_arn, + principal_type, + principal_id, + ): + assignment = AccountAssignment( + instance_arn, + target_id, + target_type, + permission_set_arn, + principal_type, + principal_id, + ) + self.account_assignments.append(assignment) + return assignment.to_json() + + def delete_account_assignment( + self, + instance_arn, + target_id, + target_type, + permission_set_arn, + principal_type, + principal_id, + ): + account = self._find_account( + instance_arn, + target_id, + target_type, + permission_set_arn, + principal_type, + principal_id, + ) + self.account_assignments.remove(account) + return account.to_json(include_creation_date=True) + + def _find_account( + self, + instance_arn, + target_id, + target_type, + permission_set_arn, + principal_type, + principal_id, + ): + for account in self.account_assignments: + instance_arn_match = account.instance_arn == instance_arn + target_id_match = account.target_id == target_id + target_type_match = account.target_type == target_type + permission_set_match = account.permission_set_arn == permission_set_arn + principal_type_match = account.principal_type == principal_type + principal_id_match = account.principal_id == principal_id + if ( + instance_arn_match + and target_id_match + and target_type_match + and permission_set_match + and principal_type_match + and principal_id_match + ): + return account + raise ResourceNotFound + + def list_account_assignments(self, instance_arn, account_id, permission_set_arn): + """ + Pagination has not yet been implemented + """ + account_assignments = [] + for assignment in self.account_assignments: + if ( + assignment.instance_arn == instance_arn + and assignment.target_id == account_id + and assignment.permission_set_arn == permission_set_arn + ): + account_assignments.append( + { + "AccountId": account_id, + "PermissionSetArn": assignment.permission_set_arn, + "PrincipalType": assignment.principal_type, + "PrincipalId": assignment.principal_id, + } + ) + return account_assignments + + +ssoadmin_backends = BackendDict(SSOAdminBackend, "sso") diff --git a/moto/ssoadmin/responses.py b/moto/ssoadmin/responses.py new file mode 100644 index 000000000..43b8091ab --- /dev/null +++ b/moto/ssoadmin/responses.py @@ -0,0 +1,67 @@ +import json + +from moto.core.responses import BaseResponse +from uuid import uuid4 + +from .models import ssoadmin_backends + + +class SSOAdminResponse(BaseResponse): + """Handler for SSOAdmin requests and responses.""" + + @property + def ssoadmin_backend(self): + """Return backend instance specific for this region.""" + return ssoadmin_backends[self.region] + + def create_account_assignment(self): + params = json.loads(self.body) + instance_arn = params.get("InstanceArn") + target_id = params.get("TargetId") + target_type = params.get("TargetType") + permission_set_arn = params.get("PermissionSetArn") + principal_type = params.get("PrincipalType") + principal_id = params.get("PrincipalId") + summary = self.ssoadmin_backend.create_account_assignment( + instance_arn=instance_arn, + target_id=target_id, + target_type=target_type, + permission_set_arn=permission_set_arn, + principal_type=principal_type, + principal_id=principal_id, + ) + summary["Status"] = "SUCCEEDED" + summary["RequestId"] = str(uuid4()) + return json.dumps({"AccountAssignmentCreationStatus": summary}) + + def delete_account_assignment(self): + params = json.loads(self.body) + instance_arn = params.get("InstanceArn") + target_id = params.get("TargetId") + target_type = params.get("TargetType") + permission_set_arn = params.get("PermissionSetArn") + principal_type = params.get("PrincipalType") + principal_id = params.get("PrincipalId") + summary = self.ssoadmin_backend.delete_account_assignment( + instance_arn=instance_arn, + target_id=target_id, + target_type=target_type, + permission_set_arn=permission_set_arn, + principal_type=principal_type, + principal_id=principal_id, + ) + summary["Status"] = "SUCCEEDED" + summary["RequestId"] = str(uuid4()) + return json.dumps({"AccountAssignmentDeletionStatus": summary}) + + def list_account_assignments(self): + params = json.loads(self.body) + instance_arn = params.get("InstanceArn") + account_id = params.get("AccountId") + permission_set_arn = params.get("PermissionSetArn") + assignments = self.ssoadmin_backend.list_account_assignments( + instance_arn=instance_arn, + account_id=account_id, + permission_set_arn=permission_set_arn, + ) + return json.dumps({"AccountAssignments": assignments}) diff --git a/moto/ssoadmin/urls.py b/moto/ssoadmin/urls.py new file mode 100644 index 000000000..531968fd6 --- /dev/null +++ b/moto/ssoadmin/urls.py @@ -0,0 +1,11 @@ +"""ssoadmin base URL and path.""" +from .responses import SSOAdminResponse + +url_bases = [ + r"https?://sso\.(.+)\.amazonaws\.com", +] + + +url_paths = { + "{0}/$": SSOAdminResponse.dispatch, +} diff --git a/tests/test_ssoadmin/__init__.py b/tests/test_ssoadmin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_ssoadmin/test_server.py b/tests/test_ssoadmin/test_server.py new file mode 100644 index 000000000..ccf27a575 --- /dev/null +++ b/tests/test_ssoadmin/test_server.py @@ -0,0 +1,24 @@ +import json +import sure # noqa # pylint: disable=unused-import + +import moto.server as server + + +def test_ssoadmin_list(): + backend = server.create_backend_app("sso-admin") + test_client = backend.test_client() + + headers = { + "X-Amz-Target": "SWBExternalService.ListAccountAssignments", + "User-Agent": "aws-cli/2.2.47 Python/3.8.8 Linux/5.11.0-44-generic exe/x86_64.ubuntu.20 prompt/off command/sso-admin.list-account-assignments", + } + data = { + "InstanceArn": "arn:aws:sso:::instance/ins-aaaabbbbccccdddd", + "AccountId": "222222222222", + "PermissionSetArn": "arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo", + } + + resp = test_client.post("/", headers=headers, data=json.dumps(data)) + + resp.status_code.should.equal(200) + json.loads(resp.data).should.equal({"AccountAssignments": []}) diff --git a/tests/test_ssoadmin/test_ssoadmin.py b/tests/test_ssoadmin/test_ssoadmin.py new file mode 100644 index 000000000..64091ad33 --- /dev/null +++ b/tests/test_ssoadmin/test_ssoadmin.py @@ -0,0 +1,190 @@ +import boto3 +import datetime +import pytest +import sure # noqa # pylint: disable=unused-import + +from botocore.exceptions import ClientError +from moto import mock_ssoadmin +from uuid import uuid4 + +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html + + +@mock_ssoadmin +def test_create_account_assignment(): + client = boto3.client("sso-admin", region_name="eu-west-1") + target_id = "222222222222" + permission_set_arn = ( + "arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo" + ) + principal_id = str(uuid4()) + + resp = client.create_account_assignment( + InstanceArn="arn:aws:sso:::instance/ins-aaaabbbbccccdddd", + TargetId=target_id, + TargetType="AWS_ACCOUNT", + PermissionSetArn=permission_set_arn, + PrincipalType="USER", + PrincipalId=principal_id, + ) + + resp.should.have.key("AccountAssignmentCreationStatus") + + status = resp["AccountAssignmentCreationStatus"] + status.should.have.key("Status").equals("SUCCEEDED") + status.should.have.key("RequestId") + status.shouldnt.have.key("FailureReason") + status.should.have.key("TargetId").equals(target_id) + status.should.have.key("TargetType").equals("AWS_ACCOUNT") + status.should.have.key("PermissionSetArn").equals(permission_set_arn) + status.should.have.key("PrincipalType").equals("USER") + status.should.have.key("PrincipalId").equals(principal_id) + + +@mock_ssoadmin +def test_delete_account_assignment(): + client = boto3.client("sso-admin", region_name="eu-west-1") + target_id = "222222222222" + permission_set_arn = ( + "arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo" + ) + principal_id = str(uuid4()) + instance_arn = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd" + + client.create_account_assignment( + InstanceArn=instance_arn, + TargetId=target_id, + TargetType="AWS_ACCOUNT", + PermissionSetArn=permission_set_arn, + PrincipalType="USER", + PrincipalId=principal_id, + ) + + resp = client.delete_account_assignment( + InstanceArn=instance_arn, + TargetId=target_id, + TargetType="AWS_ACCOUNT", + PermissionSetArn=permission_set_arn, + PrincipalType="USER", + PrincipalId=principal_id, + ) + resp.should.have.key("AccountAssignmentDeletionStatus") + + # Verify the correct response + status = resp["AccountAssignmentDeletionStatus"] + status.should.have.key("Status").equals("SUCCEEDED") + status.should.have.key("RequestId") + status.shouldnt.have.key("FailureReason") + status.should.have.key("TargetId").equals(target_id) + status.should.have.key("TargetType").equals("AWS_ACCOUNT") + status.should.have.key("PermissionSetArn").equals(permission_set_arn) + status.should.have.key("PrincipalType").equals("USER") + status.should.have.key("PrincipalId").equals(principal_id) + status.should.have.key("CreatedDate").should.be.a(datetime.datetime) + + # Verify this account assignment can no longer be found + resp = client.list_account_assignments( + InstanceArn=instance_arn, + AccountId=target_id, + PermissionSetArn=permission_set_arn, + ) + + resp.should.have.key("AccountAssignments").equals([]) + + +@mock_ssoadmin +def test_delete_account_assignment_unknown(): + client = boto3.client("sso-admin", region_name="us-east-1") + + target_id = "222222222222" + permission_set_arn = ( + "arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo" + ) + principal_id = str(uuid4()) + instance_arn = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd" + + with pytest.raises(ClientError) as exc: + client.delete_account_assignment( + InstanceArn=instance_arn, + TargetId=target_id, + TargetType="AWS_ACCOUNT", + PermissionSetArn=permission_set_arn, + PrincipalType="USER", + PrincipalId=principal_id, + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ResourceNotFound") + + +@mock_ssoadmin +def test_list_account_assignments(): + client = boto3.client("sso-admin", region_name="ap-southeast-1") + + target_id1 = "222222222222" + target_id2 = "333333333333" + permission_set_arn = ( + "arn:aws:sso:::permissionSet/ins-eeeeffffgggghhhh/ps-hhhhkkkkppppoooo" + ) + principal_id = str(uuid4()) + instance_arn = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd" + + resp = client.list_account_assignments( + InstanceArn=instance_arn, + AccountId=target_id1, + PermissionSetArn=permission_set_arn, + ) + + resp.should.have.key("AccountAssignments").equals([]) + + client.create_account_assignment( + InstanceArn=instance_arn, + TargetId=target_id1, + TargetType="AWS_ACCOUNT", + PermissionSetArn=permission_set_arn, + PrincipalType="USER", + PrincipalId=principal_id, + ) + + resp = client.list_account_assignments( + InstanceArn=instance_arn, + AccountId=target_id1, + PermissionSetArn=permission_set_arn, + ) + + resp.should.have.key("AccountAssignments").equals( + [ + { + "AccountId": target_id1, + "PermissionSetArn": permission_set_arn, + "PrincipalType": "USER", + "PrincipalId": principal_id, + } + ] + ) + + client.create_account_assignment( + InstanceArn=instance_arn, + TargetId=target_id2, + TargetType="AWS_ACCOUNT", + PermissionSetArn=permission_set_arn, + PrincipalType="USER", + PrincipalId=principal_id, + ) + + resp = client.list_account_assignments( + InstanceArn=instance_arn, + AccountId=target_id2, + PermissionSetArn=permission_set_arn, + ) + + resp.should.have.key("AccountAssignments").equals( + [ + { + "AccountId": target_id2, + "PermissionSetArn": permission_set_arn, + "PrincipalType": "USER", + "PrincipalId": principal_id, + } + ] + )