sso-admin: list_account_assignments() pagination (#7181)

This commit is contained in:
Joel McCoy 2024-01-03 14:03:21 -06:00 committed by GitHub
parent ea47ba7939
commit c436bb93ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 84 additions and 8 deletions

View File

@ -157,12 +157,10 @@ class SSOAdminBackend(BaseBackend):
return account return account
raise ResourceNotFound raise ResourceNotFound
@paginate(PAGINATION_MODEL) # type: ignore[misc]
def list_account_assignments( def list_account_assignments(
self, instance_arn: str, account_id: str, permission_set_arn: str self, instance_arn: str, account_id: str, permission_set_arn: str
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""
Pagination has not yet been implemented
"""
account_assignments = [] account_assignments = []
for assignment in self.account_assignments: for assignment in self.account_assignments:
if ( if (
@ -172,7 +170,7 @@ class SSOAdminBackend(BaseBackend):
): ):
account_assignments.append( account_assignments.append(
{ {
"AccountId": account_id, "AccountId": assignment.target_id,
"PermissionSetArn": assignment.permission_set_arn, "PermissionSetArn": assignment.permission_set_arn,
"PrincipalType": assignment.principal_type, "PrincipalType": assignment.principal_type,
"PrincipalId": assignment.principal_id, "PrincipalId": assignment.principal_id,

View File

@ -62,12 +62,18 @@ class SSOAdminResponse(BaseResponse):
instance_arn = params.get("InstanceArn") instance_arn = params.get("InstanceArn")
account_id = params.get("AccountId") account_id = params.get("AccountId")
permission_set_arn = params.get("PermissionSetArn") permission_set_arn = params.get("PermissionSetArn")
assignments = self.ssoadmin_backend.list_account_assignments( max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
assignments, next_token = self.ssoadmin_backend.list_account_assignments(
instance_arn=instance_arn, instance_arn=instance_arn,
account_id=account_id, account_id=account_id,
permission_set_arn=permission_set_arn, permission_set_arn=permission_set_arn,
next_token=next_token,
max_results=max_results,
) )
return json.dumps({"AccountAssignments": assignments})
return json.dumps(dict(AccountAssignments=assignments, NextToken=next_token))
def list_account_assignments_for_principal(self) -> str: def list_account_assignments_for_principal(self) -> str:
filter_ = self._get_param("Filter", {}) filter_ = self._get_param("Filter", {})

View File

@ -10,7 +10,19 @@ PAGINATION_MODEL = {
"input_token": "next_token", "input_token": "next_token",
"limit_key": "max_results", "limit_key": "max_results",
"limit_default": 100, "limit_default": 100,
"result_key": "PermissionSets", "result_key": "AccountAssignments",
"unique_attribute": [
"AccountId",
"PermissionSetArn",
"PrincipalId",
"PrincipalType",
],
},
"list_account_assignments": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 100,
"result_key": "AccountAssignments",
"unique_attribute": [ "unique_attribute": [
"AccountId", "AccountId",
"PermissionSetArn", "PermissionSetArn",

View File

@ -23,4 +23,4 @@ def test_ssoadmin_list():
resp = test_client.post("/", headers=headers, data=json.dumps(data)) resp = test_client.post("/", headers=headers, data=json.dumps(data))
assert resp.status_code == 200 assert resp.status_code == 200
assert json.loads(resp.data) == {"AccountAssignments": []} assert json.loads(resp.data) == {"AccountAssignments": [], "NextToken": None}

View File

@ -190,6 +190,66 @@ def test_list_account_assignments():
] ]
@mock_ssoadmin
def test_list_account_assignments_pagination():
client = boto3.client("sso-admin", region_name="ap-southeast-1")
DUMMY_AWS_ACCOUNT_ID = "111111111111"
dummy_account_assignments = []
for _ in range(3):
dummy_account_assignments.append(
{
"InstanceArn": DUMMY_INSTANCE_ARN,
"TargetId": DUMMY_AWS_ACCOUNT_ID,
"TargetType": "AWS_ACCOUNT",
"PermissionSetArn": DUMMY_PERMISSIONSET_ID,
"PrincipalType": "USER",
"PrincipalId": str(uuid4()),
},
)
for dummy_account_assignment in dummy_account_assignments:
client.create_account_assignment(**dummy_account_assignment)
account_assignments = []
response = client.list_account_assignments(
InstanceArn=DUMMY_INSTANCE_ARN,
AccountId=DUMMY_AWS_ACCOUNT_ID,
PermissionSetArn=DUMMY_PERMISSIONSET_ID,
MaxResults=2,
)
assert len(response["AccountAssignments"]) == 2
account_assignments.extend(response["AccountAssignments"])
next_token = response["NextToken"]
response = client.list_account_assignments(
InstanceArn=DUMMY_INSTANCE_ARN,
AccountId=DUMMY_AWS_ACCOUNT_ID,
PermissionSetArn=DUMMY_PERMISSIONSET_ID,
MaxResults=2,
NextToken=next_token,
)
assert len(response["AccountAssignments"]) == 1
account_assignments.extend(response["AccountAssignments"])
# ensure 3 unique assignments returned
assert (
len(
set(
[
account_assignment["PrincipalId"]
for account_assignment in account_assignments
]
)
)
== 3
)
@mock_ssoadmin @mock_ssoadmin
def test_list_account_assignments_for_principal(): def test_list_account_assignments_for_principal():
client = boto3.client("sso-admin", region_name="us-west-2") client = boto3.client("sso-admin", region_name="us-west-2")