From c436bb93ef31b49306910fce706b98ee375a0ae1 Mon Sep 17 00:00:00 2001 From: Joel McCoy Date: Wed, 3 Jan 2024 14:03:21 -0600 Subject: [PATCH] sso-admin: list_account_assignments() pagination (#7181) --- moto/ssoadmin/models.py | 6 +-- moto/ssoadmin/responses.py | 10 ++++- moto/ssoadmin/utils.py | 14 ++++++- tests/test_ssoadmin/test_server.py | 2 +- tests/test_ssoadmin/test_ssoadmin.py | 60 ++++++++++++++++++++++++++++ 5 files changed, 84 insertions(+), 8 deletions(-) diff --git a/moto/ssoadmin/models.py b/moto/ssoadmin/models.py index 4a5097525..69f4cea92 100644 --- a/moto/ssoadmin/models.py +++ b/moto/ssoadmin/models.py @@ -157,12 +157,10 @@ class SSOAdminBackend(BaseBackend): return account raise ResourceNotFound + @paginate(PAGINATION_MODEL) # type: ignore[misc] def list_account_assignments( self, instance_arn: str, account_id: str, permission_set_arn: str ) -> List[Dict[str, Any]]: - """ - Pagination has not yet been implemented - """ account_assignments = [] for assignment in self.account_assignments: if ( @@ -172,7 +170,7 @@ class SSOAdminBackend(BaseBackend): ): account_assignments.append( { - "AccountId": account_id, + "AccountId": assignment.target_id, "PermissionSetArn": assignment.permission_set_arn, "PrincipalType": assignment.principal_type, "PrincipalId": assignment.principal_id, diff --git a/moto/ssoadmin/responses.py b/moto/ssoadmin/responses.py index e89ca0e89..e18316556 100644 --- a/moto/ssoadmin/responses.py +++ b/moto/ssoadmin/responses.py @@ -62,12 +62,18 @@ class SSOAdminResponse(BaseResponse): instance_arn = params.get("InstanceArn") account_id = params.get("AccountId") permission_set_arn = params.get("PermissionSetArn") - assignments = self.ssoadmin_backend.list_account_assignments( + max_results = self._get_param("MaxResults") + next_token = self._get_param("NextToken") + + assignments, next_token = self.ssoadmin_backend.list_account_assignments( instance_arn=instance_arn, account_id=account_id, permission_set_arn=permission_set_arn, + next_token=next_token, + max_results=max_results, ) - return json.dumps({"AccountAssignments": assignments}) + + return json.dumps(dict(AccountAssignments=assignments, NextToken=next_token)) def list_account_assignments_for_principal(self) -> str: filter_ = self._get_param("Filter", {}) diff --git a/moto/ssoadmin/utils.py b/moto/ssoadmin/utils.py index 45fc457f2..e7c3230ed 100644 --- a/moto/ssoadmin/utils.py +++ b/moto/ssoadmin/utils.py @@ -10,7 +10,19 @@ PAGINATION_MODEL = { "input_token": "next_token", "limit_key": "max_results", "limit_default": 100, - "result_key": "PermissionSets", + "result_key": "AccountAssignments", + "unique_attribute": [ + "AccountId", + "PermissionSetArn", + "PrincipalId", + "PrincipalType", + ], + }, + "list_account_assignments": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 100, + "result_key": "AccountAssignments", "unique_attribute": [ "AccountId", "PermissionSetArn", diff --git a/tests/test_ssoadmin/test_server.py b/tests/test_ssoadmin/test_server.py index 51623d78a..5e76e1b69 100644 --- a/tests/test_ssoadmin/test_server.py +++ b/tests/test_ssoadmin/test_server.py @@ -23,4 +23,4 @@ def test_ssoadmin_list(): resp = test_client.post("/", headers=headers, data=json.dumps(data)) assert resp.status_code == 200 - assert json.loads(resp.data) == {"AccountAssignments": []} + assert json.loads(resp.data) == {"AccountAssignments": [], "NextToken": None} diff --git a/tests/test_ssoadmin/test_ssoadmin.py b/tests/test_ssoadmin/test_ssoadmin.py index ab1e88a17..b2104bd8b 100644 --- a/tests/test_ssoadmin/test_ssoadmin.py +++ b/tests/test_ssoadmin/test_ssoadmin.py @@ -190,6 +190,66 @@ def test_list_account_assignments(): ] +@mock_ssoadmin +def test_list_account_assignments_pagination(): + client = boto3.client("sso-admin", region_name="ap-southeast-1") + DUMMY_AWS_ACCOUNT_ID = "111111111111" + + dummy_account_assignments = [] + for _ in range(3): + dummy_account_assignments.append( + { + "InstanceArn": DUMMY_INSTANCE_ARN, + "TargetId": DUMMY_AWS_ACCOUNT_ID, + "TargetType": "AWS_ACCOUNT", + "PermissionSetArn": DUMMY_PERMISSIONSET_ID, + "PrincipalType": "USER", + "PrincipalId": str(uuid4()), + }, + ) + + for dummy_account_assignment in dummy_account_assignments: + client.create_account_assignment(**dummy_account_assignment) + + account_assignments = [] + + response = client.list_account_assignments( + InstanceArn=DUMMY_INSTANCE_ARN, + AccountId=DUMMY_AWS_ACCOUNT_ID, + PermissionSetArn=DUMMY_PERMISSIONSET_ID, + MaxResults=2, + ) + + assert len(response["AccountAssignments"]) == 2 + account_assignments.extend(response["AccountAssignments"]) + + next_token = response["NextToken"] + + response = client.list_account_assignments( + InstanceArn=DUMMY_INSTANCE_ARN, + AccountId=DUMMY_AWS_ACCOUNT_ID, + PermissionSetArn=DUMMY_PERMISSIONSET_ID, + MaxResults=2, + NextToken=next_token, + ) + + assert len(response["AccountAssignments"]) == 1 + account_assignments.extend(response["AccountAssignments"]) + + # ensure 3 unique assignments returned + assert ( + len( + set( + [ + account_assignment["PrincipalId"] + for account_assignment in account_assignments + ] + ) + ) + == 3 + ) + + @mock_ssoadmin def test_list_account_assignments_for_principal(): client = boto3.client("sso-admin", region_name="us-west-2")