import json import os import re from moto.core.responses import BaseResponse from .models import ( cognitoidp_backends, find_account_region_by_value, RegionAgnosticBackend, UserStatus, ) from .exceptions import InvalidParameterException region_agnostic_backend = RegionAgnosticBackend() class CognitoIdpResponse(BaseResponse): def __init__(self): super().__init__(service_name="cognito-idp") @property def parameters(self): return json.loads(self.body) @property def backend(self): return cognitoidp_backends[self.current_account][self.region] # User pool def create_user_pool(self): name = self.parameters.pop("PoolName") user_pool = self.backend.create_user_pool(name, self.parameters) return json.dumps({"UserPool": user_pool.to_json(extended=True)}) def set_user_pool_mfa_config(self): user_pool_id = self._get_param("UserPoolId") sms_config = self._get_param("SmsMfaConfiguration", None) token_config = self._get_param("SoftwareTokenMfaConfiguration", None) mfa_config = self._get_param("MfaConfiguration") if mfa_config not in ["ON", "OFF", "OPTIONAL"]: raise InvalidParameterException( "[MfaConfiguration] must be one of 'ON', 'OFF', or 'OPTIONAL'." ) if mfa_config in ["ON", "OPTIONAL"]: if sms_config is None and token_config is None: raise InvalidParameterException( "At least one of [SmsMfaConfiguration] or [SoftwareTokenMfaConfiguration] must be provided." ) if sms_config is not None: if "SmsConfiguration" not in sms_config: raise InvalidParameterException( "[SmsConfiguration] is a required member of [SoftwareTokenMfaConfiguration]." ) response = self.backend.set_user_pool_mfa_config( user_pool_id, sms_config, token_config, mfa_config ) return json.dumps(response) def get_user_pool_mfa_config(self): user_pool_id = self._get_param("UserPoolId") response = self.backend.get_user_pool_mfa_config(user_pool_id) return json.dumps(response) def list_user_pools(self): max_results = self._get_param("MaxResults") next_token = self._get_param("NextToken") user_pools, next_token = self.backend.list_user_pools( max_results=max_results, next_token=next_token ) response = {"UserPools": [user_pool.to_json() for user_pool in user_pools]} if next_token: response["NextToken"] = str(next_token) return json.dumps(response) def describe_user_pool(self): user_pool_id = self._get_param("UserPoolId") user_pool = self.backend.describe_user_pool(user_pool_id) return json.dumps({"UserPool": user_pool.to_json(extended=True)}) def update_user_pool(self): user_pool_id = self._get_param("UserPoolId") self.backend.update_user_pool(user_pool_id, self.parameters) def delete_user_pool(self): user_pool_id = self._get_param("UserPoolId") self.backend.delete_user_pool(user_pool_id) return "" # User pool domain def create_user_pool_domain(self): domain = self._get_param("Domain") user_pool_id = self._get_param("UserPoolId") custom_domain_config = self._get_param("CustomDomainConfig") user_pool_domain = self.backend.create_user_pool_domain( user_pool_id, domain, custom_domain_config ) domain_description = user_pool_domain.to_json(extended=False) if domain_description: return json.dumps(domain_description) return "" def describe_user_pool_domain(self): domain = self._get_param("Domain") user_pool_domain = self.backend.describe_user_pool_domain(domain) domain_description = {} if user_pool_domain: domain_description = user_pool_domain.to_json() return json.dumps({"DomainDescription": domain_description}) def delete_user_pool_domain(self): domain = self._get_param("Domain") self.backend.delete_user_pool_domain(domain) return "" def update_user_pool_domain(self): domain = self._get_param("Domain") custom_domain_config = self._get_param("CustomDomainConfig") user_pool_domain = self.backend.update_user_pool_domain( domain, custom_domain_config ) domain_description = user_pool_domain.to_json(extended=False) if domain_description: return json.dumps(domain_description) return "" # User pool client def create_user_pool_client(self): user_pool_id = self.parameters.pop("UserPoolId") generate_secret = self.parameters.pop("GenerateSecret", False) user_pool_client = self.backend.create_user_pool_client( user_pool_id, generate_secret, self.parameters ) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) def list_user_pool_clients(self): user_pool_id = self._get_param("UserPoolId") max_results = self._get_param("MaxResults") next_token = self._get_param("NextToken") user_pool_clients, next_token = self.backend.list_user_pool_clients( user_pool_id, max_results=max_results, next_token=next_token ) response = { "UserPoolClients": [ user_pool_client.to_json() for user_pool_client in user_pool_clients ] } if next_token: response["NextToken"] = str(next_token) return json.dumps(response) def describe_user_pool_client(self): user_pool_id = self._get_param("UserPoolId") client_id = self._get_param("ClientId") user_pool_client = self.backend.describe_user_pool_client( user_pool_id, client_id ) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) def update_user_pool_client(self): user_pool_id = self.parameters.pop("UserPoolId") client_id = self.parameters.pop("ClientId") user_pool_client = self.backend.update_user_pool_client( user_pool_id, client_id, self.parameters ) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) def delete_user_pool_client(self): user_pool_id = self._get_param("UserPoolId") client_id = self._get_param("ClientId") self.backend.delete_user_pool_client(user_pool_id, client_id) return "" # Identity provider def create_identity_provider(self): user_pool_id = self._get_param("UserPoolId") name = self.parameters.pop("ProviderName") identity_provider = self.backend.create_identity_provider( user_pool_id, name, self.parameters ) return json.dumps( {"IdentityProvider": identity_provider.to_json(extended=True)} ) def list_identity_providers(self): user_pool_id = self._get_param("UserPoolId") max_results = self._get_param("MaxResults") next_token = self._get_param("NextToken") identity_providers, next_token = self.backend.list_identity_providers( user_pool_id, max_results=max_results, next_token=next_token ) response = { "Providers": [ identity_provider.to_json() for identity_provider in identity_providers ] } if next_token: response["NextToken"] = str(next_token) return json.dumps(response) def describe_identity_provider(self): user_pool_id = self._get_param("UserPoolId") name = self._get_param("ProviderName") identity_provider = self.backend.describe_identity_provider(user_pool_id, name) return json.dumps( {"IdentityProvider": identity_provider.to_json(extended=True)} ) def update_identity_provider(self): user_pool_id = self._get_param("UserPoolId") name = self._get_param("ProviderName") identity_provider = self.backend.update_identity_provider( user_pool_id, name, self.parameters ) return json.dumps( {"IdentityProvider": identity_provider.to_json(extended=True)} ) def delete_identity_provider(self): user_pool_id = self._get_param("UserPoolId") name = self._get_param("ProviderName") self.backend.delete_identity_provider(user_pool_id, name) return "" # Group def create_group(self): group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") description = self._get_param("Description") role_arn = self._get_param("RoleArn") precedence = self._get_param("Precedence") group = self.backend.create_group( user_pool_id, group_name, description, role_arn, precedence ) return json.dumps({"Group": group.to_json()}) def get_group(self): group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") group = self.backend.get_group(user_pool_id, group_name) return json.dumps({"Group": group.to_json()}) def list_groups(self): user_pool_id = self._get_param("UserPoolId") limit = self._get_param("Limit") token = self._get_param("NextToken") groups, token = self.backend.list_groups( user_pool_id, limit=limit, next_token=token ) response = {"Groups": [group.to_json() for group in groups]} if token: response["NextToken"] = token return json.dumps(response) def delete_group(self): group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") self.backend.delete_group(user_pool_id, group_name) return "" def update_group(self): group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") description = self._get_param("Description") role_arn = self._get_param("RoleArn") precedence = self._get_param("Precedence") group = self.backend.update_group( user_pool_id, group_name, description, role_arn, precedence ) return json.dumps({"Group": group.to_json()}) def admin_add_user_to_group(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") group_name = self._get_param("GroupName") self.backend.admin_add_user_to_group(user_pool_id, group_name, username) return "" def list_users_in_group(self): user_pool_id = self._get_param("UserPoolId") group_name = self._get_param("GroupName") limit = self._get_param("Limit") token = self._get_param("NextToken") users, token = self.backend.list_users_in_group( user_pool_id, group_name, limit=limit, next_token=token ) response = {"Users": [user.to_json(extended=True) for user in users]} if token: response["NextToken"] = token return json.dumps(response) def admin_list_groups_for_user(self): username = self._get_param("Username") user_pool_id = self._get_param("UserPoolId") groups = self.backend.admin_list_groups_for_user(user_pool_id, username) return json.dumps({"Groups": [group.to_json() for group in groups]}) def admin_remove_user_from_group(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") group_name = self._get_param("GroupName") self.backend.admin_remove_user_from_group(user_pool_id, group_name, username) return "" def admin_reset_user_password(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_reset_user_password(user_pool_id, username) return "" # User def admin_create_user(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") message_action = self._get_param("MessageAction") temporary_password = self._get_param("TemporaryPassword") user = self.backend.admin_create_user( user_pool_id, username, message_action, temporary_password, self._get_param("UserAttributes", []), ) return json.dumps({"User": user.to_json(extended=True)}) def admin_confirm_sign_up(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") return self.backend.admin_confirm_sign_up(user_pool_id, username) def admin_get_user(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") user = self.backend.admin_get_user(user_pool_id, username) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) def get_user(self): access_token = self._get_param("AccessToken") user = region_agnostic_backend.get_user(access_token=access_token) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) def list_users(self): user_pool_id = self._get_param("UserPoolId") limit = self._get_param("Limit") token = self._get_param("PaginationToken") filt = self._get_param("Filter") attributes_to_get = self._get_param("AttributesToGet") users, token = self.backend.list_users( user_pool_id, limit=limit, pagination_token=token ) if filt: inherent_attributes = { "cognito:user_status": lambda u: u.status, "status": lambda u: "Enabled" if u.enabled else "Disabled", "username": lambda u: u.username, } comparisons = {"=": lambda x, y: x == y, "^=": lambda x, y: x.startswith(y)} allowed_attributes = [ "username", "email", "phone_number", "name", "given_name", "family_name", "preferred_username", "cognito:user_status", "status", "sub", ] match = re.match(r"([\w:]+)\s*(=|\^=)\s*\"(.*)\"", filt) if match: name, op, value = match.groups() else: raise InvalidParameterException("Error while parsing filter") if name not in allowed_attributes: raise InvalidParameterException(f"Invalid search attribute: {name}") compare = comparisons[op] users = [ user for user in users if [ attr for attr in user.attributes if attr["Name"] == name and compare(attr["Value"], value) ] or ( name in inherent_attributes and compare(inherent_attributes[name](user), value) ) ] response = { "Users": [ user.to_json(extended=True, attributes_to_get=attributes_to_get) for user in users ] } if token: response["PaginationToken"] = str(token) return json.dumps(response) def admin_disable_user(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_disable_user(user_pool_id, username) return "" def admin_enable_user(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_enable_user(user_pool_id, username) return "" def admin_delete_user(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_delete_user(user_pool_id, username) return "" def admin_initiate_auth(self): user_pool_id = self._get_param("UserPoolId") client_id = self._get_param("ClientId") auth_flow = self._get_param("AuthFlow") auth_parameters = self._get_param("AuthParameters") auth_result = self.backend.admin_initiate_auth( user_pool_id, client_id, auth_flow, auth_parameters ) return json.dumps(auth_result) def respond_to_auth_challenge(self): session = self._get_param("Session") client_id = self._get_param("ClientId") challenge_name = self._get_param("ChallengeName") challenge_responses = self._get_param("ChallengeResponses") auth_result = region_agnostic_backend.respond_to_auth_challenge( session, client_id, challenge_name, challenge_responses ) return json.dumps(auth_result) def forgot_password(self): client_id = self._get_param("ClientId") username = self._get_param("Username") account, region = find_account_region_by_value("client_id", client_id) confirmation_code, response = cognitoidp_backends[account][ region ].forgot_password(client_id, username) self.response_headers[ "x-moto-forgot-password-confirmation-code" ] = confirmation_code return json.dumps(response) # This endpoint receives no authorization header, so if moto-server is listening # on localhost (doesn't get a region in the host header), it doesn't know what # region's backend should handle the traffic, and we use `find_region_by_value` to # solve that problem. def confirm_forgot_password(self): client_id = self._get_param("ClientId") username = self._get_param("Username") password = self._get_param("Password") confirmation_code = self._get_param("ConfirmationCode") account, region = find_account_region_by_value("client_id", client_id) cognitoidp_backends[account][region].confirm_forgot_password( client_id, username, password, confirmation_code ) return "" # Ditto the comment on confirm_forgot_password. def change_password(self): access_token = self._get_param("AccessToken") previous_password = self._get_param("PreviousPassword") proposed_password = self._get_param("ProposedPassword") account, region = find_account_region_by_value("access_token", access_token) cognitoidp_backends[account][region].change_password( access_token, previous_password, proposed_password ) return "" def admin_update_user_attributes(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") attributes = self._get_param("UserAttributes") self.backend.admin_update_user_attributes(user_pool_id, username, attributes) return "" def admin_delete_user_attributes(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") attributes = self._get_param("UserAttributeNames") self.backend.admin_delete_user_attributes(user_pool_id, username, attributes) return "" def admin_user_global_sign_out(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_user_global_sign_out(user_pool_id, username) return "" def global_sign_out(self): access_token = self._get_param("AccessToken") self.backend.global_sign_out(access_token) return "" # Resource Server def create_resource_server(self): user_pool_id = self._get_param("UserPoolId") identifier = self._get_param("Identifier") name = self._get_param("Name") scopes = self._get_param("Scopes") resource_server = self.backend.create_resource_server( user_pool_id, identifier, name, scopes ) return json.dumps({"ResourceServer": resource_server.to_json()}) def sign_up(self): client_id = self._get_param("ClientId") username = self._get_param("Username") password = self._get_param("Password") user = region_agnostic_backend.sign_up( client_id=client_id, username=username, password=password, attributes=self._get_param("UserAttributes", []), ) return json.dumps( { "UserConfirmed": user.status == UserStatus["CONFIRMED"], "UserSub": user.id, } ) def confirm_sign_up(self): client_id = self._get_param("ClientId") username = self._get_param("Username") region_agnostic_backend.confirm_sign_up(client_id=client_id, username=username) return "" def initiate_auth(self): client_id = self._get_param("ClientId") auth_flow = self._get_param("AuthFlow") auth_parameters = self._get_param("AuthParameters") auth_result = region_agnostic_backend.initiate_auth( client_id, auth_flow, auth_parameters ) return json.dumps(auth_result) def associate_software_token(self): access_token = self._get_param("AccessToken") result = self.backend.associate_software_token(access_token) return json.dumps(result) def verify_software_token(self): access_token = self._get_param("AccessToken") result = self.backend.verify_software_token(access_token) return json.dumps(result) def set_user_mfa_preference(self): access_token = self._get_param("AccessToken") software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings") sms_mfa_settings = self._get_param("SMSMfaSettings") self.backend.set_user_mfa_preference( access_token, software_token_mfa_settings, sms_mfa_settings ) return "" def admin_set_user_mfa_preference(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings") sms_mfa_settings = self._get_param("SMSMfaSettings") self.backend.admin_set_user_mfa_preference( user_pool_id, username, software_token_mfa_settings, sms_mfa_settings ) return "" def admin_set_user_password(self): user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") password = self._get_param("Password") permanent = self._get_param("Permanent") self.backend.admin_set_user_password( user_pool_id, username, password, permanent ) return "" def add_custom_attributes(self): user_pool_id = self._get_param("UserPoolId") custom_attributes = self._get_param("CustomAttributes") self.backend.add_custom_attributes(user_pool_id, custom_attributes) return "" def update_user_attributes(self): access_token = self._get_param("AccessToken") attributes = self._get_param("UserAttributes") self.backend.update_user_attributes(access_token, attributes) return json.dumps({}) class CognitoIdpJsonWebKeyResponse(BaseResponse): def __init__(self): with open( os.path.join(os.path.dirname(__file__), "resources/jwks-public.json") ) as f: self.json_web_key = f.read() def serve_json_web_key( self, request, full_url, headers ): # pylint: disable=unused-argument return 200, {"Content-Type": "application/json"}, self.json_web_key