From b6a582e6244f705c4e3234b25d86e09c2d1d0100 Mon Sep 17 00:00:00 2001 From: Paul Date: Thu, 24 Aug 2023 15:09:38 -0400 Subject: [PATCH] Cognito-IDP describe_resource_server() and list_resource_servers() (#6717) --- docs/docs/services/cognito-idp.rst | 4 +- moto/backend_index.py | 4 +- moto/cognitoidp/models.py | 19 ++ moto/cognitoidp/responses.py | 24 +++ moto/cognitoidp/utils.py | 6 + tests/test_cognitoidp/test_cognitoidp.py | 220 ++++++++++++++++++++++- 6 files changed, 272 insertions(+), 5 deletions(-) diff --git a/docs/docs/services/cognito-idp.rst b/docs/docs/services/cognito-idp.rst index 3f1c67deb..b5021803d 100644 --- a/docs/docs/services/cognito-idp.rst +++ b/docs/docs/services/cognito-idp.rst @@ -75,7 +75,7 @@ cognito-idp - [X] delete_user_pool_client - [X] delete_user_pool_domain - [X] describe_identity_provider -- [ ] describe_resource_server +- [X] describe_resource_server - [ ] describe_risk_configuration - [ ] describe_user_import_job - [X] describe_user_pool @@ -108,7 +108,7 @@ cognito-idp - [ ] list_devices - [X] list_groups - [X] list_identity_providers -- [ ] list_resource_servers +- [X] list_resource_servers - [ ] list_tags_for_resource - [ ] list_user_import_jobs - [X] list_user_pool_clients diff --git a/moto/backend_index.py b/moto/backend_index.py index fc1508f28..129e7bd7a 100644 --- a/moto/backend_index.py +++ b/moto/backend_index.py @@ -1,4 +1,4 @@ -# autogenerated by scripts/update_backend_index.py +# autogenerated by /Users/plussier/dev/github/moto/scripts/update_backend_index.py import re backend_url_patterns = [ @@ -114,7 +114,7 @@ backend_url_patterns = [ ), ( "meteringmarketplace", - re.compile("https?://metering\\.marketplace.(.+).amazonaws.com"), + re.compile("https?://metering.marketplace.(.+).amazonaws.com"), ), ("meteringmarketplace", re.compile("https?://aws-marketplace.(.+).amazonaws.com")), ("mq", re.compile("https?://mq\\.(.+)\\.amazonaws\\.com")), diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index 9fa9711d4..0c8c05399 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -1705,6 +1705,25 @@ class CognitoIdpBackend(BaseBackend): user_pool.resource_servers[identifier] = resource_server return resource_server + def describe_resource_server( + self, user_pool_id: str, identifier: str + ) -> CognitoResourceServer: + user_pool = self.user_pools.get(user_pool_id) + if not user_pool: + raise ResourceNotFoundError(f"User pool {user_pool_id} does not exist.") + + resource_server = user_pool.resource_servers.get(identifier) + if not resource_server: + raise ResourceNotFoundError(f"Resource server {identifier} does not exist.") + + return resource_server + + @paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc] + def list_resource_servers(self, user_pool_id: str) -> List[CognitoResourceServer]: + user_pool = self.user_pools[user_pool_id] + resource_servers = list(user_pool.resource_servers.values()) + return resource_servers + def sign_up( self, client_id: str, diff --git a/moto/cognitoidp/responses.py b/moto/cognitoidp/responses.py index cb767d7ea..a76369c94 100644 --- a/moto/cognitoidp/responses.py +++ b/moto/cognitoidp/responses.py @@ -534,6 +534,30 @@ class CognitoIdpResponse(BaseResponse): ) return json.dumps({"ResourceServer": resource_server.to_json()}) + def describe_resource_server(self) -> str: + user_pool_id = self._get_param("UserPoolId") + identifier = self._get_param("Identifier") + resource_server = self.backend.describe_resource_server( + user_pool_id, identifier + ) + return json.dumps({"ResourceServer": resource_server.to_json()}) + + def list_resource_servers(self) -> str: + max_results = self._get_param("MaxResults") + next_token = self._get_param("NextToken") + user_pool_id = self._get_param("UserPoolId") + resource_servers, next_token = self.backend.list_resource_servers( + user_pool_id, max_results=max_results, next_token=next_token + ) + response: Dict[str, Any] = { + "ResourceServers": [ + resource_server.to_json() for resource_server in resource_servers + ] + } + if next_token: + response["NextToken"] = str(next_token) + return json.dumps(response) + def sign_up(self) -> str: client_id = self._get_param("ClientId") username = self._get_param("Username") diff --git a/moto/cognitoidp/utils.py b/moto/cognitoidp/utils.py index 2a9f8daa9..de5077fc5 100644 --- a/moto/cognitoidp/utils.py +++ b/moto/cognitoidp/utils.py @@ -49,6 +49,12 @@ PAGINATION_MODEL = { "limit_default": 60, "unique_attribute": "id", }, + "list_resource_servers": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 60, + "unique_attribute": "identifier", + }, } diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 33570847a..fa22ea810 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -3537,7 +3537,7 @@ def test_update_user_attributes_unknown_accesstoken(): @mock_cognitoidp -def test_resource_server(): +def test_create_resource_server(): client = boto3.client("cognito-idp", "us-west-2") name = str(uuid.uuid4()) res = client.create_user_pool(PoolName=name) @@ -3573,6 +3573,224 @@ def test_resource_server(): assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 +@mock_cognitoidp +def test_describe_resource_server(): + + # Create a user pool to attach a resource server to + client = boto3.client("cognito-idp", "us-west-2") + name = str(uuid.uuid4()) + user_pool = client.create_user_pool(PoolName=name) + user_pool_id = user_pool["UserPool"]["Id"] + + server_id = "my_server" + server_name = "new_remote_server" + scopes = [ + {"ScopeName": "app:write", "ScopeDescription": "write scope"}, + {"ScopeName": "app:read", "ScopeDescription": "read scope"}, + ] + + # Create a new resource server + new_resource_server = client.create_resource_server( + UserPoolId=user_pool_id, Identifier=server_id, Name=server_name, Scopes=scopes + ) + + assert new_resource_server["ResourceServer"]["UserPoolId"] == user_pool_id + assert new_resource_server["ResourceServer"]["Identifier"] == server_id + assert new_resource_server["ResourceServer"]["Name"] == server_name + assert new_resource_server["ResourceServer"]["Scopes"] == scopes + + # Describe the newly created resource server + response = client.describe_resource_server( + UserPoolId=user_pool_id, Identifier=server_id + ) + + # Assert all the values we expect are seen in the description. + assert response["ResourceServer"]["UserPoolId"] == user_pool_id + assert response["ResourceServer"]["Identifier"] == server_id + assert response["ResourceServer"]["Name"] == server_name + assert response["ResourceServer"]["Scopes"] == scopes + + # Make sure attempting to describe a non-existent server fails in + # the expected manner + fake_server_id = "non_existent_server" + negative_response = None + with pytest.raises(ClientError) as ex: + negative_response = client.describe_resource_server( + UserPoolId=user_pool_id, Identifier=fake_server_id + ) + + # Assert that error message content is what's expected for a failure. + assert negative_response is None + assert ex.value.operation_name == "DescribeResourceServer" + assert ex.value.response["Error"]["Code"] == "ResourceNotFoundException" + assert ( + ex.value.response["Error"]["Message"] + == f"Resource server {fake_server_id} does not exist." + ) + assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 + + +@mock_cognitoidp +def test_list_resource_servers_empty_set(): + + # Create a user pool to attach a resource server to + client = boto3.client("cognito-idp", "us-west-2") + name = str(uuid.uuid4()) + user_pool = client.create_user_pool(PoolName=name) + user_pool_id = user_pool["UserPool"]["Id"] + + # Empty list, because we aren't creating any. + all_resource_svrs = [] + + max_return = 50 + servers = client.list_resource_servers( + UserPoolId=user_pool_id, MaxResults=max_return + ) + + expected_keys = ["ResourceServers", "ResponseMetadata"] + assert all(key in servers for key in expected_keys) + assert servers["ResponseMetadata"].get("HTTPStatusCode") + assert servers["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert servers.get("NextToken", False) is False + assert len(servers["ResourceServers"]) == len(all_resource_svrs) + assert len(servers["ResourceServers"]) == 0 + + +@mock_cognitoidp +def test_list_resource_servers_single_page(): + + # Create a user pool to attach a resource server to + client = boto3.client("cognito-idp", "us-west-2") + name = str(uuid.uuid4()) + user_pool = client.create_user_pool(PoolName=name) + user_pool_id = user_pool["UserPool"]["Id"] + create_num = 48 + + all_resource_svrs = [] + for id_num in range(0, create_num, 1): + server_id = f"my_server{id_num}" + server_name = "new_remote_server{id_num}" + scopes = [ + { + "ScopeName": f"app:write{id_num}", + "ScopeDescription": f"write scope{id_num}", + }, + { + "ScopeName": f"app:read{id_num}", + "ScopeDescription": f"read scope{id_num}", + }, + ] + + # Create a new resource server + new_resource_server = client.create_resource_server( + UserPoolId=user_pool_id, + Identifier=server_id, + Name=server_name, + Scopes=scopes, + ) + + all_resource_svrs.append(new_resource_server) + + max_return = 50 + servers = client.list_resource_servers( + UserPoolId=user_pool_id, MaxResults=max_return + ) + + expected_keys = ["ResourceServers", "ResponseMetadata"] + assert all(key in servers for key in expected_keys) + assert servers["ResponseMetadata"].get("HTTPStatusCode") + assert servers["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert servers.get("NextToken", False) is False + assert len(servers["ResourceServers"]) == create_num + returned_servers = servers["ResourceServers"] + + for idx in range(0, create_num - 1, 1): + for key in returned_servers[idx].keys(): + assert ( + returned_servers[idx][key] + == all_resource_svrs[idx]["ResourceServer"][key] + ) + + +@mock_cognitoidp +def test_list_resource_servers_multi_page(): + + # Create a user pool to attach a resource server to + client = boto3.client("cognito-idp", "us-west-2") + name = str(uuid.uuid4()) + user_pool = client.create_user_pool(PoolName=name) + user_pool_id = user_pool["UserPool"]["Id"] + create_num = 65 + + all_resource_svrs = [] + for id_num in range(0, create_num, 1): + server_id = f"my_server{id_num}" + server_name = "new_remote_server{id_num}" + scopes = [ + { + "ScopeName": f"app:write{id_num}", + "ScopeDescription": f"write scope{id_num}", + }, + { + "ScopeName": f"app:read{id_num}", + "ScopeDescription": f"read scope{id_num}", + }, + ] + + # Create a new resource server + new_resource_server = client.create_resource_server( + UserPoolId=user_pool_id, + Identifier=server_id, + Name=server_name, + Scopes=scopes, + ) + + all_resource_svrs.append(new_resource_server) + + max_return = 50 + servers = client.list_resource_servers( + UserPoolId=user_pool_id, MaxResults=max_return + ) + + expected_keys = ["ResourceServers", "NextToken", "ResponseMetadata"] + assert all(key in servers for key in expected_keys) + assert servers["ResponseMetadata"].get("HTTPStatusCode") + assert servers["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert servers.get("NextToken", False) + assert len(servers["ResourceServers"]) == max_return + returned_servers = servers["ResourceServers"] + + for idx in range(0, max_return - 1, 1): + for key in returned_servers[idx].keys(): + assert ( + returned_servers[idx][key] + == all_resource_svrs[idx]["ResourceServer"][key] + ) + + next_page = client.list_resource_servers( + UserPoolId=user_pool_id, MaxResults=max_return, NextToken=servers["NextToken"] + ) + + expected_keys = ["ResourceServers", "ResponseMetadata"] + expected_returns = create_num - max_return + assert all(key in next_page for key in expected_keys) + assert next_page["ResponseMetadata"].get("HTTPStatusCode") + assert next_page["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert next_page.get("NextToken", False) is False + assert len(next_page["ResourceServers"]) == expected_returns + returned_servers = next_page["ResourceServers"] + + # Check the second page of results + # Each entry in the second page should be the offset of 'max_return + idx' in all_resource_svrs + for idx in range(0, expected_returns, 1): + for key in returned_servers[idx].keys(): + all_idx = idx + max_return + assert ( + returned_servers[idx][key] + == all_resource_svrs[all_idx]["ResourceServer"][key] + ) + + @mock_cognitoidp def test_sign_up(): conn = boto3.client("cognito-idp", "us-west-2")