Cognito-IDP describe_resource_server() and list_resource_servers() (#6717)

This commit is contained in:
Paul 2023-08-24 15:09:38 -04:00 committed by GitHub
parent d351853276
commit b6a582e624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 272 additions and 5 deletions

View File

@ -75,7 +75,7 @@ cognito-idp
- [X] delete_user_pool_client - [X] delete_user_pool_client
- [X] delete_user_pool_domain - [X] delete_user_pool_domain
- [X] describe_identity_provider - [X] describe_identity_provider
- [ ] describe_resource_server - [X] describe_resource_server
- [ ] describe_risk_configuration - [ ] describe_risk_configuration
- [ ] describe_user_import_job - [ ] describe_user_import_job
- [X] describe_user_pool - [X] describe_user_pool
@ -108,7 +108,7 @@ cognito-idp
- [ ] list_devices - [ ] list_devices
- [X] list_groups - [X] list_groups
- [X] list_identity_providers - [X] list_identity_providers
- [ ] list_resource_servers - [X] list_resource_servers
- [ ] list_tags_for_resource - [ ] list_tags_for_resource
- [ ] list_user_import_jobs - [ ] list_user_import_jobs
- [X] list_user_pool_clients - [X] list_user_pool_clients

View File

@ -1,4 +1,4 @@
# autogenerated by scripts/update_backend_index.py # autogenerated by /Users/plussier/dev/github/moto/scripts/update_backend_index.py
import re import re
backend_url_patterns = [ backend_url_patterns = [
@ -114,7 +114,7 @@ backend_url_patterns = [
), ),
( (
"meteringmarketplace", "meteringmarketplace",
re.compile("https?://metering\\.marketplace.(.+).amazonaws.com"), re.compile("https?://metering.marketplace.(.+).amazonaws.com"),
), ),
("meteringmarketplace", re.compile("https?://aws-marketplace.(.+).amazonaws.com")), ("meteringmarketplace", re.compile("https?://aws-marketplace.(.+).amazonaws.com")),
("mq", re.compile("https?://mq\\.(.+)\\.amazonaws\\.com")), ("mq", re.compile("https?://mq\\.(.+)\\.amazonaws\\.com")),

View File

@ -1705,6 +1705,25 @@ class CognitoIdpBackend(BaseBackend):
user_pool.resource_servers[identifier] = resource_server user_pool.resource_servers[identifier] = resource_server
return resource_server return resource_server
def describe_resource_server(
self, user_pool_id: str, identifier: str
) -> CognitoResourceServer:
user_pool = self.user_pools.get(user_pool_id)
if not user_pool:
raise ResourceNotFoundError(f"User pool {user_pool_id} does not exist.")
resource_server = user_pool.resource_servers.get(identifier)
if not resource_server:
raise ResourceNotFoundError(f"Resource server {identifier} does not exist.")
return resource_server
@paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc]
def list_resource_servers(self, user_pool_id: str) -> List[CognitoResourceServer]:
user_pool = self.user_pools[user_pool_id]
resource_servers = list(user_pool.resource_servers.values())
return resource_servers
def sign_up( def sign_up(
self, self,
client_id: str, client_id: str,

View File

@ -534,6 +534,30 @@ class CognitoIdpResponse(BaseResponse):
) )
return json.dumps({"ResourceServer": resource_server.to_json()}) return json.dumps({"ResourceServer": resource_server.to_json()})
def describe_resource_server(self) -> str:
user_pool_id = self._get_param("UserPoolId")
identifier = self._get_param("Identifier")
resource_server = self.backend.describe_resource_server(
user_pool_id, identifier
)
return json.dumps({"ResourceServer": resource_server.to_json()})
def list_resource_servers(self) -> str:
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
user_pool_id = self._get_param("UserPoolId")
resource_servers, next_token = self.backend.list_resource_servers(
user_pool_id, max_results=max_results, next_token=next_token
)
response: Dict[str, Any] = {
"ResourceServers": [
resource_server.to_json() for resource_server in resource_servers
]
}
if next_token:
response["NextToken"] = str(next_token)
return json.dumps(response)
def sign_up(self) -> str: def sign_up(self) -> str:
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
username = self._get_param("Username") username = self._get_param("Username")

View File

@ -49,6 +49,12 @@ PAGINATION_MODEL = {
"limit_default": 60, "limit_default": 60,
"unique_attribute": "id", "unique_attribute": "id",
}, },
"list_resource_servers": {
"input_token": "next_token",
"limit_key": "max_results",
"limit_default": 60,
"unique_attribute": "identifier",
},
} }

View File

@ -3537,7 +3537,7 @@ def test_update_user_attributes_unknown_accesstoken():
@mock_cognitoidp @mock_cognitoidp
def test_resource_server(): def test_create_resource_server():
client = boto3.client("cognito-idp", "us-west-2") client = boto3.client("cognito-idp", "us-west-2")
name = str(uuid.uuid4()) name = str(uuid.uuid4())
res = client.create_user_pool(PoolName=name) res = client.create_user_pool(PoolName=name)
@ -3573,6 +3573,224 @@ def test_resource_server():
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400 assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
@mock_cognitoidp
def test_describe_resource_server():
# Create a user pool to attach a resource server to
client = boto3.client("cognito-idp", "us-west-2")
name = str(uuid.uuid4())
user_pool = client.create_user_pool(PoolName=name)
user_pool_id = user_pool["UserPool"]["Id"]
server_id = "my_server"
server_name = "new_remote_server"
scopes = [
{"ScopeName": "app:write", "ScopeDescription": "write scope"},
{"ScopeName": "app:read", "ScopeDescription": "read scope"},
]
# Create a new resource server
new_resource_server = client.create_resource_server(
UserPoolId=user_pool_id, Identifier=server_id, Name=server_name, Scopes=scopes
)
assert new_resource_server["ResourceServer"]["UserPoolId"] == user_pool_id
assert new_resource_server["ResourceServer"]["Identifier"] == server_id
assert new_resource_server["ResourceServer"]["Name"] == server_name
assert new_resource_server["ResourceServer"]["Scopes"] == scopes
# Describe the newly created resource server
response = client.describe_resource_server(
UserPoolId=user_pool_id, Identifier=server_id
)
# Assert all the values we expect are seen in the description.
assert response["ResourceServer"]["UserPoolId"] == user_pool_id
assert response["ResourceServer"]["Identifier"] == server_id
assert response["ResourceServer"]["Name"] == server_name
assert response["ResourceServer"]["Scopes"] == scopes
# Make sure attempting to describe a non-existent server fails in
# the expected manner
fake_server_id = "non_existent_server"
negative_response = None
with pytest.raises(ClientError) as ex:
negative_response = client.describe_resource_server(
UserPoolId=user_pool_id, Identifier=fake_server_id
)
# Assert that error message content is what's expected for a failure.
assert negative_response is None
assert ex.value.operation_name == "DescribeResourceServer"
assert ex.value.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
ex.value.response["Error"]["Message"]
== f"Resource server {fake_server_id} does not exist."
)
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
@mock_cognitoidp
def test_list_resource_servers_empty_set():
# Create a user pool to attach a resource server to
client = boto3.client("cognito-idp", "us-west-2")
name = str(uuid.uuid4())
user_pool = client.create_user_pool(PoolName=name)
user_pool_id = user_pool["UserPool"]["Id"]
# Empty list, because we aren't creating any.
all_resource_svrs = []
max_return = 50
servers = client.list_resource_servers(
UserPoolId=user_pool_id, MaxResults=max_return
)
expected_keys = ["ResourceServers", "ResponseMetadata"]
assert all(key in servers for key in expected_keys)
assert servers["ResponseMetadata"].get("HTTPStatusCode")
assert servers["ResponseMetadata"]["HTTPStatusCode"] == 200
assert servers.get("NextToken", False) is False
assert len(servers["ResourceServers"]) == len(all_resource_svrs)
assert len(servers["ResourceServers"]) == 0
@mock_cognitoidp
def test_list_resource_servers_single_page():
# Create a user pool to attach a resource server to
client = boto3.client("cognito-idp", "us-west-2")
name = str(uuid.uuid4())
user_pool = client.create_user_pool(PoolName=name)
user_pool_id = user_pool["UserPool"]["Id"]
create_num = 48
all_resource_svrs = []
for id_num in range(0, create_num, 1):
server_id = f"my_server{id_num}"
server_name = "new_remote_server{id_num}"
scopes = [
{
"ScopeName": f"app:write{id_num}",
"ScopeDescription": f"write scope{id_num}",
},
{
"ScopeName": f"app:read{id_num}",
"ScopeDescription": f"read scope{id_num}",
},
]
# Create a new resource server
new_resource_server = client.create_resource_server(
UserPoolId=user_pool_id,
Identifier=server_id,
Name=server_name,
Scopes=scopes,
)
all_resource_svrs.append(new_resource_server)
max_return = 50
servers = client.list_resource_servers(
UserPoolId=user_pool_id, MaxResults=max_return
)
expected_keys = ["ResourceServers", "ResponseMetadata"]
assert all(key in servers for key in expected_keys)
assert servers["ResponseMetadata"].get("HTTPStatusCode")
assert servers["ResponseMetadata"]["HTTPStatusCode"] == 200
assert servers.get("NextToken", False) is False
assert len(servers["ResourceServers"]) == create_num
returned_servers = servers["ResourceServers"]
for idx in range(0, create_num - 1, 1):
for key in returned_servers[idx].keys():
assert (
returned_servers[idx][key]
== all_resource_svrs[idx]["ResourceServer"][key]
)
@mock_cognitoidp
def test_list_resource_servers_multi_page():
# Create a user pool to attach a resource server to
client = boto3.client("cognito-idp", "us-west-2")
name = str(uuid.uuid4())
user_pool = client.create_user_pool(PoolName=name)
user_pool_id = user_pool["UserPool"]["Id"]
create_num = 65
all_resource_svrs = []
for id_num in range(0, create_num, 1):
server_id = f"my_server{id_num}"
server_name = "new_remote_server{id_num}"
scopes = [
{
"ScopeName": f"app:write{id_num}",
"ScopeDescription": f"write scope{id_num}",
},
{
"ScopeName": f"app:read{id_num}",
"ScopeDescription": f"read scope{id_num}",
},
]
# Create a new resource server
new_resource_server = client.create_resource_server(
UserPoolId=user_pool_id,
Identifier=server_id,
Name=server_name,
Scopes=scopes,
)
all_resource_svrs.append(new_resource_server)
max_return = 50
servers = client.list_resource_servers(
UserPoolId=user_pool_id, MaxResults=max_return
)
expected_keys = ["ResourceServers", "NextToken", "ResponseMetadata"]
assert all(key in servers for key in expected_keys)
assert servers["ResponseMetadata"].get("HTTPStatusCode")
assert servers["ResponseMetadata"]["HTTPStatusCode"] == 200
assert servers.get("NextToken", False)
assert len(servers["ResourceServers"]) == max_return
returned_servers = servers["ResourceServers"]
for idx in range(0, max_return - 1, 1):
for key in returned_servers[idx].keys():
assert (
returned_servers[idx][key]
== all_resource_svrs[idx]["ResourceServer"][key]
)
next_page = client.list_resource_servers(
UserPoolId=user_pool_id, MaxResults=max_return, NextToken=servers["NextToken"]
)
expected_keys = ["ResourceServers", "ResponseMetadata"]
expected_returns = create_num - max_return
assert all(key in next_page for key in expected_keys)
assert next_page["ResponseMetadata"].get("HTTPStatusCode")
assert next_page["ResponseMetadata"]["HTTPStatusCode"] == 200
assert next_page.get("NextToken", False) is False
assert len(next_page["ResourceServers"]) == expected_returns
returned_servers = next_page["ResourceServers"]
# Check the second page of results
# Each entry in the second page should be the offset of 'max_return + idx' in all_resource_svrs
for idx in range(0, expected_returns, 1):
for key in returned_servers[idx].keys():
all_idx = idx + max_return
assert (
returned_servers[idx][key]
== all_resource_svrs[all_idx]["ResourceServer"][key]
)
@mock_cognitoidp @mock_cognitoidp
def test_sign_up(): def test_sign_up():
conn = boto3.client("cognito-idp", "us-west-2") conn = boto3.client("cognito-idp", "us-west-2")