moto/moto/cognitoidp/responses.py
Steve Pulec a62f15b537
Merge pull request #1730 from garyd203/cognitoidp-groups
Implement groups for cognito-idp
2018-12-28 19:53:53 -05:00

356 lines
13 KiB
Python

from __future__ import unicode_literals
import json
import os
from moto.core.responses import BaseResponse
from .models import cognitoidp_backends, find_region_by_value
class CognitoIdpResponse(BaseResponse):
@property
def parameters(self):
return json.loads(self.body)
# User pool
def create_user_pool(self):
name = self.parameters.pop("PoolName")
user_pool = cognitoidp_backends[self.region].create_user_pool(name, self.parameters)
return json.dumps({
"UserPool": user_pool.to_json(extended=True)
})
def list_user_pools(self):
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken", "0")
user_pools, next_token = cognitoidp_backends[self.region].list_user_pools(
max_results=max_results, next_token=next_token
)
response = {
"UserPools": [user_pool.to_json() for user_pool in user_pools],
}
if next_token:
response["NextToken"] = str(next_token)
return json.dumps(response)
def describe_user_pool(self):
user_pool_id = self._get_param("UserPoolId")
user_pool = cognitoidp_backends[self.region].describe_user_pool(user_pool_id)
return json.dumps({
"UserPool": user_pool.to_json(extended=True)
})
def delete_user_pool(self):
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].delete_user_pool(user_pool_id)
return ""
# User pool domain
def create_user_pool_domain(self):
domain = self._get_param("Domain")
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].create_user_pool_domain(user_pool_id, domain)
return ""
def describe_user_pool_domain(self):
domain = self._get_param("Domain")
user_pool_domain = cognitoidp_backends[self.region].describe_user_pool_domain(domain)
domain_description = {}
if user_pool_domain:
domain_description = user_pool_domain.to_json()
return json.dumps({
"DomainDescription": domain_description
})
def delete_user_pool_domain(self):
domain = self._get_param("Domain")
cognitoidp_backends[self.region].delete_user_pool_domain(domain)
return ""
# User pool client
def create_user_pool_client(self):
user_pool_id = self.parameters.pop("UserPoolId")
user_pool_client = cognitoidp_backends[self.region].create_user_pool_client(user_pool_id, self.parameters)
return json.dumps({
"UserPoolClient": user_pool_client.to_json(extended=True)
})
def list_user_pool_clients(self):
user_pool_id = self._get_param("UserPoolId")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken", "0")
user_pool_clients, next_token = cognitoidp_backends[self.region].list_user_pool_clients(user_pool_id,
max_results=max_results, next_token=next_token)
response = {
"UserPoolClients": [user_pool_client.to_json() for user_pool_client in user_pool_clients]
}
if next_token:
response["NextToken"] = str(next_token)
return json.dumps(response)
def describe_user_pool_client(self):
user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId")
user_pool_client = cognitoidp_backends[self.region].describe_user_pool_client(user_pool_id, client_id)
return json.dumps({
"UserPoolClient": user_pool_client.to_json(extended=True)
})
def update_user_pool_client(self):
user_pool_id = self.parameters.pop("UserPoolId")
client_id = self.parameters.pop("ClientId")
user_pool_client = cognitoidp_backends[self.region].update_user_pool_client(user_pool_id, client_id, self.parameters)
return json.dumps({
"UserPoolClient": user_pool_client.to_json(extended=True)
})
def delete_user_pool_client(self):
user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId")
cognitoidp_backends[self.region].delete_user_pool_client(user_pool_id, client_id)
return ""
# Identity provider
def create_identity_provider(self):
user_pool_id = self._get_param("UserPoolId")
name = self.parameters.pop("ProviderName")
identity_provider = cognitoidp_backends[self.region].create_identity_provider(user_pool_id, name, self.parameters)
return json.dumps({
"IdentityProvider": identity_provider.to_json(extended=True)
})
def list_identity_providers(self):
user_pool_id = self._get_param("UserPoolId")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken", "0")
identity_providers, next_token = cognitoidp_backends[self.region].list_identity_providers(
user_pool_id, max_results=max_results, next_token=next_token
)
response = {
"Providers": [identity_provider.to_json() for identity_provider in identity_providers]
}
if next_token:
response["NextToken"] = str(next_token)
return json.dumps(response)
def describe_identity_provider(self):
user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName")
identity_provider = cognitoidp_backends[self.region].describe_identity_provider(user_pool_id, name)
return json.dumps({
"IdentityProvider": identity_provider.to_json(extended=True)
})
def delete_identity_provider(self):
user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName")
cognitoidp_backends[self.region].delete_identity_provider(user_pool_id, name)
return ""
# Group
def create_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
description = self._get_param("Description")
role_arn = self._get_param("RoleArn")
precedence = self._get_param("Precedence")
group = cognitoidp_backends[self.region].create_group(
user_pool_id,
group_name,
description,
role_arn,
precedence,
)
return json.dumps({
"Group": group.to_json(),
})
def get_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
group = cognitoidp_backends[self.region].get_group(user_pool_id, group_name)
return json.dumps({
"Group": group.to_json(),
})
def list_groups(self):
user_pool_id = self._get_param("UserPoolId")
groups = cognitoidp_backends[self.region].list_groups(user_pool_id)
return json.dumps({
"Groups": [group.to_json() for group in groups],
})
def delete_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].delete_group(user_pool_id, group_name)
return ""
def admin_add_user_to_group(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
group_name = self._get_param("GroupName")
cognitoidp_backends[self.region].admin_add_user_to_group(
user_pool_id,
group_name,
username,
)
return ""
def list_users_in_group(self):
user_pool_id = self._get_param("UserPoolId")
group_name = self._get_param("GroupName")
users = cognitoidp_backends[self.region].list_users_in_group(user_pool_id, group_name)
return json.dumps({
"Users": [user.to_json(extended=True) for user in users],
})
def admin_list_groups_for_user(self):
username = self._get_param("Username")
user_pool_id = self._get_param("UserPoolId")
groups = cognitoidp_backends[self.region].admin_list_groups_for_user(user_pool_id, username)
return json.dumps({
"Groups": [group.to_json() for group in groups],
})
def admin_remove_user_from_group(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
group_name = self._get_param("GroupName")
cognitoidp_backends[self.region].admin_remove_user_from_group(
user_pool_id,
group_name,
username,
)
return ""
# User
def admin_create_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
temporary_password = self._get_param("TemporaryPassword")
user = cognitoidp_backends[self.region].admin_create_user(
user_pool_id,
username,
temporary_password,
self._get_param("UserAttributes", [])
)
return json.dumps({
"User": user.to_json(extended=True)
})
def admin_get_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
user = cognitoidp_backends[self.region].admin_get_user(user_pool_id, username)
return json.dumps(
user.to_json(extended=True, attributes_key="UserAttributes")
)
def list_users(self):
user_pool_id = self._get_param("UserPoolId")
limit = self._get_param("Limit")
token = self._get_param("PaginationToken")
users, token = cognitoidp_backends[self.region].list_users(user_pool_id,
limit=limit,
pagination_token=token)
response = {"Users": [user.to_json(extended=True) for user in users]}
if token:
response["PaginationToken"] = str(token)
return json.dumps(response)
def admin_disable_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_disable_user(user_pool_id, username)
return ""
def admin_enable_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_enable_user(user_pool_id, username)
return ""
def admin_delete_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_delete_user(user_pool_id, username)
return ""
def admin_initiate_auth(self):
user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId")
auth_flow = self._get_param("AuthFlow")
auth_parameters = self._get_param("AuthParameters")
auth_result = cognitoidp_backends[self.region].admin_initiate_auth(
user_pool_id,
client_id,
auth_flow,
auth_parameters,
)
return json.dumps(auth_result)
def respond_to_auth_challenge(self):
session = self._get_param("Session")
client_id = self._get_param("ClientId")
challenge_name = self._get_param("ChallengeName")
challenge_responses = self._get_param("ChallengeResponses")
auth_result = cognitoidp_backends[self.region].respond_to_auth_challenge(
session,
client_id,
challenge_name,
challenge_responses,
)
return json.dumps(auth_result)
def forgot_password(self):
return json.dumps({
"CodeDeliveryDetails": {
"DeliveryMedium": "EMAIL",
"Destination": "...",
}
})
# This endpoint receives no authorization header, so if moto-server is listening
# on localhost (doesn't get a region in the host header), it doesn't know what
# region's backend should handle the traffic, and we use `find_region_by_value` to
# solve that problem.
def confirm_forgot_password(self):
client_id = self._get_param("ClientId")
username = self._get_param("Username")
password = self._get_param("Password")
region = find_region_by_value("client_id", client_id)
cognitoidp_backends[region].confirm_forgot_password(client_id, username, password)
return ""
# Ditto the comment on confirm_forgot_password.
def change_password(self):
access_token = self._get_param("AccessToken")
previous_password = self._get_param("PreviousPassword")
proposed_password = self._get_param("ProposedPassword")
region = find_region_by_value("access_token", access_token)
cognitoidp_backends[region].change_password(access_token, previous_password, proposed_password)
return ""
class CognitoIdpJsonWebKeyResponse(BaseResponse):
def __init__(self):
with open(os.path.join(os.path.dirname(__file__), "resources/jwks-public.json")) as f:
self.json_web_key = f.read()
def serve_json_web_key(self, request, full_url, headers):
return 200, {"Content-Type": "application/json"}, self.json_web_key