moto/moto/cognitoidp/responses.py

573 lines
22 KiB
Python
Raw Normal View History

2018-05-02 21:13:12 +00:00
import json
import os
import re
2018-05-02 21:13:12 +00:00
from moto.core.responses import BaseResponse
from .models import cognitoidp_backends, find_region_by_value, UserStatus
from .exceptions import InvalidParameterException
2018-05-02 21:13:12 +00:00
class CognitoIdpResponse(BaseResponse):
@property
def parameters(self):
return json.loads(self.body)
# User pool
def create_user_pool(self):
name = self.parameters.pop("PoolName")
2019-10-31 15:44:26 +00:00
user_pool = cognitoidp_backends[self.region].create_user_pool(
name, self.parameters
)
return json.dumps({"UserPool": user_pool.to_json(extended=True)})
2018-05-02 21:13:12 +00:00
def set_user_pool_mfa_config(self):
user_pool_id = self._get_param("UserPoolId")
sms_config = self._get_param("SmsMfaConfiguration", None)
token_config = self._get_param("SoftwareTokenMfaConfiguration", None)
mfa_config = self._get_param("MfaConfiguration")
if mfa_config not in ["ON", "OFF", "OPTIONAL"]:
raise InvalidParameterException(
"[MfaConfiguration] must be one of 'ON', 'OFF', or 'OPTIONAL'."
)
if mfa_config in ["ON", "OPTIONAL"]:
if sms_config is None and token_config is None:
raise InvalidParameterException(
"At least one of [SmsMfaConfiguration] or [SoftwareTokenMfaConfiguration] must be provided."
)
if sms_config is not None:
if "SmsConfiguration" not in sms_config:
raise InvalidParameterException(
"[SmsConfiguration] is a required member of [SoftwareTokenMfaConfiguration]."
)
response = cognitoidp_backends[self.region].set_user_pool_mfa_config(
user_pool_id, sms_config, token_config, mfa_config
)
return json.dumps(response)
def get_user_pool_mfa_config(self):
user_pool_id = self._get_param("UserPoolId")
response = cognitoidp_backends[self.region].get_user_pool_mfa_config(
user_pool_id
)
return json.dumps(response)
2018-05-02 21:13:12 +00:00
def list_user_pools(self):
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
user_pools, next_token = cognitoidp_backends[self.region].list_user_pools(
max_results=max_results, next_token=next_token
)
2019-10-31 15:44:26 +00:00
response = {"UserPools": [user_pool.to_json() for user_pool in user_pools]}
if next_token:
response["NextToken"] = str(next_token)
return json.dumps(response)
2018-05-02 21:13:12 +00:00
def describe_user_pool(self):
user_pool_id = self._get_param("UserPoolId")
user_pool = cognitoidp_backends[self.region].describe_user_pool(user_pool_id)
2019-10-31 15:44:26 +00:00
return json.dumps({"UserPool": user_pool.to_json(extended=True)})
2018-05-02 21:13:12 +00:00
def update_user_pool(self):
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].update_user_pool(user_pool_id, self.parameters)
2018-05-02 21:13:12 +00:00
def delete_user_pool(self):
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].delete_user_pool(user_pool_id)
return ""
# User pool domain
def create_user_pool_domain(self):
domain = self._get_param("Domain")
user_pool_id = self._get_param("UserPoolId")
custom_domain_config = self._get_param("CustomDomainConfig")
user_pool_domain = cognitoidp_backends[self.region].create_user_pool_domain(
user_pool_id, domain, custom_domain_config
)
domain_description = user_pool_domain.to_json(extended=False)
if domain_description:
return json.dumps(domain_description)
2018-05-02 21:13:12 +00:00
return ""
def describe_user_pool_domain(self):
domain = self._get_param("Domain")
2019-10-31 15:44:26 +00:00
user_pool_domain = cognitoidp_backends[self.region].describe_user_pool_domain(
domain
)
2018-05-02 21:13:12 +00:00
domain_description = {}
if user_pool_domain:
domain_description = user_pool_domain.to_json()
2019-10-31 15:44:26 +00:00
return json.dumps({"DomainDescription": domain_description})
2018-05-02 21:13:12 +00:00
def delete_user_pool_domain(self):
domain = self._get_param("Domain")
cognitoidp_backends[self.region].delete_user_pool_domain(domain)
return ""
def update_user_pool_domain(self):
domain = self._get_param("Domain")
custom_domain_config = self._get_param("CustomDomainConfig")
user_pool_domain = cognitoidp_backends[self.region].update_user_pool_domain(
domain, custom_domain_config
)
domain_description = user_pool_domain.to_json(extended=False)
if domain_description:
return json.dumps(domain_description)
return ""
2018-05-02 21:13:12 +00:00
# User pool client
def create_user_pool_client(self):
user_pool_id = self.parameters.pop("UserPoolId")
generate_secret = self.parameters.pop("GenerateSecret", False)
2019-10-31 15:44:26 +00:00
user_pool_client = cognitoidp_backends[self.region].create_user_pool_client(
user_pool_id, generate_secret, self.parameters
2019-10-31 15:44:26 +00:00
)
return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)})
2018-05-02 21:13:12 +00:00
def list_user_pool_clients(self):
user_pool_id = self._get_param("UserPoolId")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
2019-10-31 15:44:26 +00:00
user_pool_clients, next_token = cognitoidp_backends[
self.region
].list_user_pool_clients(
user_pool_id, max_results=max_results, next_token=next_token
)
response = {
2019-10-31 15:44:26 +00:00
"UserPoolClients": [
user_pool_client.to_json() for user_pool_client in user_pool_clients
]
}
if next_token:
response["NextToken"] = str(next_token)
return json.dumps(response)
2018-05-02 21:13:12 +00:00
def describe_user_pool_client(self):
user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId")
2019-10-31 15:44:26 +00:00
user_pool_client = cognitoidp_backends[self.region].describe_user_pool_client(
user_pool_id, client_id
)
return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)})
2018-05-02 21:13:12 +00:00
def update_user_pool_client(self):
user_pool_id = self.parameters.pop("UserPoolId")
client_id = self.parameters.pop("ClientId")
2019-10-31 15:44:26 +00:00
user_pool_client = cognitoidp_backends[self.region].update_user_pool_client(
user_pool_id, client_id, self.parameters
)
return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)})
2018-05-02 21:13:12 +00:00
def delete_user_pool_client(self):
user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId")
2019-10-31 15:44:26 +00:00
cognitoidp_backends[self.region].delete_user_pool_client(
user_pool_id, client_id
)
2018-05-02 21:13:12 +00:00
return ""
# Identity provider
def create_identity_provider(self):
user_pool_id = self._get_param("UserPoolId")
name = self.parameters.pop("ProviderName")
2019-10-31 15:44:26 +00:00
identity_provider = cognitoidp_backends[self.region].create_identity_provider(
user_pool_id, name, self.parameters
)
return json.dumps(
{"IdentityProvider": identity_provider.to_json(extended=True)}
)
2018-05-02 21:13:12 +00:00
def list_identity_providers(self):
user_pool_id = self._get_param("UserPoolId")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
2019-10-31 15:44:26 +00:00
identity_providers, next_token = cognitoidp_backends[
self.region
].list_identity_providers(
user_pool_id, max_results=max_results, next_token=next_token
)
response = {
2019-10-31 15:44:26 +00:00
"Providers": [
identity_provider.to_json() for identity_provider in identity_providers
]
}
if next_token:
response["NextToken"] = str(next_token)
return json.dumps(response)
2018-05-02 21:13:12 +00:00
def describe_identity_provider(self):
user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName")
2019-10-31 15:44:26 +00:00
identity_provider = cognitoidp_backends[self.region].describe_identity_provider(
user_pool_id, name
)
return json.dumps(
{"IdentityProvider": identity_provider.to_json(extended=True)}
)
2018-05-02 21:13:12 +00:00
def update_identity_provider(self):
user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName")
2019-10-31 15:44:26 +00:00
identity_provider = cognitoidp_backends[self.region].update_identity_provider(
user_pool_id, name, self.parameters
)
return json.dumps(
{"IdentityProvider": identity_provider.to_json(extended=True)}
)
2018-05-02 21:13:12 +00:00
def delete_identity_provider(self):
user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName")
cognitoidp_backends[self.region].delete_identity_provider(user_pool_id, name)
return ""
# Group
def create_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
description = self._get_param("Description")
role_arn = self._get_param("RoleArn")
precedence = self._get_param("Precedence")
group = cognitoidp_backends[self.region].create_group(
2019-10-31 15:44:26 +00:00
user_pool_id, group_name, description, role_arn, precedence
)
2019-10-31 15:44:26 +00:00
return json.dumps({"Group": group.to_json()})
def get_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
group = cognitoidp_backends[self.region].get_group(user_pool_id, group_name)
2019-10-31 15:44:26 +00:00
return json.dumps({"Group": group.to_json()})
def list_groups(self):
user_pool_id = self._get_param("UserPoolId")
groups = cognitoidp_backends[self.region].list_groups(user_pool_id)
2019-10-31 15:44:26 +00:00
return json.dumps({"Groups": [group.to_json() for group in groups]})
def delete_group(self):
group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId")
cognitoidp_backends[self.region].delete_group(user_pool_id, group_name)
return ""
def admin_add_user_to_group(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
group_name = self._get_param("GroupName")
cognitoidp_backends[self.region].admin_add_user_to_group(
2019-10-31 15:44:26 +00:00
user_pool_id, group_name, username
)
return ""
def list_users_in_group(self):
user_pool_id = self._get_param("UserPoolId")
group_name = self._get_param("GroupName")
2019-10-31 15:44:26 +00:00
users = cognitoidp_backends[self.region].list_users_in_group(
user_pool_id, group_name
)
return json.dumps({"Users": [user.to_json(extended=True) for user in users]})
def admin_list_groups_for_user(self):
username = self._get_param("Username")
user_pool_id = self._get_param("UserPoolId")
2019-10-31 15:44:26 +00:00
groups = cognitoidp_backends[self.region].admin_list_groups_for_user(
user_pool_id, username
)
return json.dumps({"Groups": [group.to_json() for group in groups]})
def admin_remove_user_from_group(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
group_name = self._get_param("GroupName")
cognitoidp_backends[self.region].admin_remove_user_from_group(
2019-10-31 15:44:26 +00:00
user_pool_id, group_name, username
)
return ""
def admin_reset_user_password(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_reset_user_password(
user_pool_id, username
)
return ""
2018-05-02 21:13:12 +00:00
# User
def admin_create_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
message_action = self._get_param("MessageAction")
2018-05-02 21:13:12 +00:00
temporary_password = self._get_param("TemporaryPassword")
user = cognitoidp_backends[self.region].admin_create_user(
user_pool_id,
username,
message_action,
2018-05-02 21:13:12 +00:00
temporary_password,
2019-10-31 15:44:26 +00:00
self._get_param("UserAttributes", []),
2018-05-02 21:13:12 +00:00
)
2019-10-31 15:44:26 +00:00
return json.dumps({"User": user.to_json(extended=True)})
2018-05-02 21:13:12 +00:00
def admin_confirm_sign_up(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
return cognitoidp_backends[self.region].admin_confirm_sign_up(
user_pool_id, username
)
2018-05-02 21:13:12 +00:00
def admin_get_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
user = cognitoidp_backends[self.region].admin_get_user(user_pool_id, username)
2019-10-31 15:44:26 +00:00
return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes"))
2018-05-02 21:13:12 +00:00
def get_user(self):
access_token = self._get_param("AccessToken")
user = cognitoidp_backends[self.region].get_user(access_token=access_token)
return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes"))
2018-05-02 21:13:12 +00:00
def list_users(self):
user_pool_id = self._get_param("UserPoolId")
limit = self._get_param("Limit")
token = self._get_param("PaginationToken")
filt = self._get_param("Filter")
2019-10-31 15:44:26 +00:00
users, token = cognitoidp_backends[self.region].list_users(
user_pool_id, limit=limit, pagination_token=token
)
if filt:
inherent_attributes = {
"cognito:user_status": lambda u: u.status,
"status": lambda u: "Enabled" if u.enabled else "Disabled",
"username": lambda u: u.username,
}
comparisons = {"=": lambda x, y: x == y, "^=": lambda x, y: x.startswith(y)}
allowed_attributes = [
"username",
"email",
"phone_number",
"name",
"given_name",
"family_name",
"preferred_username",
"cognito:user_status",
"status",
"sub",
]
match = re.match(r"([\w:]+)\s*(=|\^=)\s*\"(.*)\"", filt)
if match:
name, op, value = match.groups()
else:
raise InvalidParameterException("Error while parsing filter")
if name not in allowed_attributes:
raise InvalidParameterException(f"Invalid search attribute: {name}")
compare = comparisons[op]
2020-02-05 13:19:08 +00:00
users = [
user
for user in users
Merge LocalStack changes into upstream moto (#4082) * fix OPTIONS requests on non-existing API GW integrations * add cloudformation models for API Gateway deployments * bump version * add backdoor to return CloudWatch metrics * Updating implementation coverage * Updating implementation coverage * add cloudformation models for API Gateway deployments * Updating implementation coverage * Updating implementation coverage * Implemented get-caller-identity returning real data depending on the access key used. * bump version * minor fixes * fix Number data_type for SQS message attribute * fix handling of encoding errors * bump version * make CF stack queryable before starting to initialize its resources * bump version * fix integration_method for API GW method integrations * fix undefined status in CF FakeStack * Fix apigateway issues with terraform v0.12.21 * resource_methods -> add handle for "DELETE" method * integrations -> fix issue that "httpMethod" wasn't included in body request (this value was set as the value from refer method resource) * bump version * Fix setting http method for API gateway integrations (#6) * bump version * remove duplicate methods * add storage class to S3 Key when completing multipart upload (#7) * fix SQS performance issues; bump version * add pagination to SecretsManager list-secrets (#9) * fix default parameter groups in RDS * fix adding S3 metadata headers with names containing dots (#13) * Updating implementation coverage * Updating implementation coverage * add cloudformation models for API Gateway deployments * Updating implementation coverage * Updating implementation coverage * Implemented get-caller-identity returning real data depending on the access key used. * make CF stack queryable before starting to initialize its resources * bump version * remove duplicate methods * fix adding S3 metadata headers with names containing dots (#13) * Update amis.json to support EKS AMI mocks (#15) * fix PascalCase for boolean value in ListMultipartUploads response (#17); fix _get_multi_param to parse nested list/dict query params * determine non-zero container exit code in Batch API * support filtering by dimensions in CW get_metric_statistics * fix storing attributes for ELBv2 Route entities; API GW refactorings for TF tests * add missing fields for API GW resources * fix error messages for Route53 (TF-compat) * various fixes for IAM resources (tf-compat) * minor fixes for API GW models (tf-compat) * minor fixes for API GW responses (tf-compat) * add s3 exception for bucket notification filter rule validation * change the way RESTErrors generate the response body and content-type header * fix lint errors and disable "black" syntax enforcement * remove return type hint in RESTError.get_body * add RESTError XML template for IAM exceptions * add support for API GW minimumCompressionSize * fix casing getting PrivateDnsEnabled API GW attribute * minor fixes for error responses * fix escaping special chars for IAM role descriptions (tf-compat) * minor fixes and tagging support for API GW and ELB v2 (tf-compat) * Merge branch 'master' into localstack * add "AlarmRule" attribute to enable support for composite CloudWatch metrics * fix recursive parsing of complex/nested query params * bump version * add API to delete S3 website configurations (#18) * use dict copy to allow parallelism and avoid concurrent modification exceptions in S3 * fix precondition check for etags in S3 (#19) * minor fix for user filtering in Cognito * fix API Gateway error response; avoid returning empty response templates (tf-compat) * support tags and tracingEnabled attribute for API GW stages * fix boolean value in S3 encryption response (#20) * fix connection arn structure * fix api destination arn structure * black format * release 2.0.3.37 * fix s3 exception tests see botocore/parsers.py:1002 where RequestId is removed from parsed * remove python 2 from build action * add test failure annotations in build action * fix events test arn comparisons * fix s3 encryption response test * return default value "0" if EC2 availableIpAddressCount is empty * fix extracting SecurityGroupIds for EC2 VPC endpoints * support deleting/updating API Gateway DomainNames * fix(events): Return empty string instead of null when no pattern is specified in EventPattern (tf-compat) (#22) * fix logic and revert CF changes to get tests running again (#21) * add support for EC2 customer gateway API (#25) * add support for EC2 Transit Gateway APIs (#24) * feat(logs): add `kmsKeyId` into `LogGroup` entity (#23) * minor change in ELBv2 logic to fix tests * feat(events): add APIs to describe and delete CloudWatch Events connections (#26) * add support for EC2 transit gateway route tables (#27) * pass transit gateway route table ID in Describe API, minor refactoring (#29) * add support for EC2 Transit Gateway Routes (#28) * fix region on ACM certificate import (#31) * add support for EC2 transit gateway attachments (#30) * add support for EC2 Transit Gateway VPN attachments (#32) * fix account ID for logs API * add support for DeleteOrganization API * feat(events): store raw filter representation for CloudWatch events patterns (tf-compat) (#36) * feat(events): add support to describe/update/delete CloudWatch API destinations (#35) * add Cognito UpdateIdentityPool, CW Logs PutResourcePolicy * feat(events): add support for tags in EventBus API (#38) * fix parameter validation for Batch compute environments (tf-compat) * revert merge conflicts in IMPLEMENTATION_COVERAGE.md * format code using black * restore original README; re-enable and fix CloudFormation tests * restore tests and old logic for CF stack parameters from SSM * parameterize RequestId/RequestID in response messages and revert related test changes * undo LocalStack-specific adaptations * minor fix * Update CodeCov config to reflect removal of Py2 * undo change related to CW metric filtering; add additional test for CW metric statistics with dimensions * Terraform - Extend whitelist of running tests Co-authored-by: acsbendi <acsbendi28@gmail.com> Co-authored-by: Phan Duong <duongpv@outlook.com> Co-authored-by: Thomas Rausch <thomas@thrau.at> Co-authored-by: Macwan Nevil <macnev2013@gmail.com> Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com> Co-authored-by: Gonzalo Saad <saad.gonzalo.ale@gmail.com> Co-authored-by: Mohit Alonja <monty16597@users.noreply.github.com> Co-authored-by: Miguel Gagliardo <migag9@gmail.com> Co-authored-by: Bert Blommers <info@bertblommers.nl>
2021-07-26 14:21:17 +00:00
if [
attr
for attr in user.attributes
if attr["Name"] == name and compare(attr["Value"], value)
Merge LocalStack changes into upstream moto (#4082) * fix OPTIONS requests on non-existing API GW integrations * add cloudformation models for API Gateway deployments * bump version * add backdoor to return CloudWatch metrics * Updating implementation coverage * Updating implementation coverage * add cloudformation models for API Gateway deployments * Updating implementation coverage * Updating implementation coverage * Implemented get-caller-identity returning real data depending on the access key used. * bump version * minor fixes * fix Number data_type for SQS message attribute * fix handling of encoding errors * bump version * make CF stack queryable before starting to initialize its resources * bump version * fix integration_method for API GW method integrations * fix undefined status in CF FakeStack * Fix apigateway issues with terraform v0.12.21 * resource_methods -> add handle for "DELETE" method * integrations -> fix issue that "httpMethod" wasn't included in body request (this value was set as the value from refer method resource) * bump version * Fix setting http method for API gateway integrations (#6) * bump version * remove duplicate methods * add storage class to S3 Key when completing multipart upload (#7) * fix SQS performance issues; bump version * add pagination to SecretsManager list-secrets (#9) * fix default parameter groups in RDS * fix adding S3 metadata headers with names containing dots (#13) * Updating implementation coverage * Updating implementation coverage * add cloudformation models for API Gateway deployments * Updating implementation coverage * Updating implementation coverage * Implemented get-caller-identity returning real data depending on the access key used. * make CF stack queryable before starting to initialize its resources * bump version * remove duplicate methods * fix adding S3 metadata headers with names containing dots (#13) * Update amis.json to support EKS AMI mocks (#15) * fix PascalCase for boolean value in ListMultipartUploads response (#17); fix _get_multi_param to parse nested list/dict query params * determine non-zero container exit code in Batch API * support filtering by dimensions in CW get_metric_statistics * fix storing attributes for ELBv2 Route entities; API GW refactorings for TF tests * add missing fields for API GW resources * fix error messages for Route53 (TF-compat) * various fixes for IAM resources (tf-compat) * minor fixes for API GW models (tf-compat) * minor fixes for API GW responses (tf-compat) * add s3 exception for bucket notification filter rule validation * change the way RESTErrors generate the response body and content-type header * fix lint errors and disable "black" syntax enforcement * remove return type hint in RESTError.get_body * add RESTError XML template for IAM exceptions * add support for API GW minimumCompressionSize * fix casing getting PrivateDnsEnabled API GW attribute * minor fixes for error responses * fix escaping special chars for IAM role descriptions (tf-compat) * minor fixes and tagging support for API GW and ELB v2 (tf-compat) * Merge branch 'master' into localstack * add "AlarmRule" attribute to enable support for composite CloudWatch metrics * fix recursive parsing of complex/nested query params * bump version * add API to delete S3 website configurations (#18) * use dict copy to allow parallelism and avoid concurrent modification exceptions in S3 * fix precondition check for etags in S3 (#19) * minor fix for user filtering in Cognito * fix API Gateway error response; avoid returning empty response templates (tf-compat) * support tags and tracingEnabled attribute for API GW stages * fix boolean value in S3 encryption response (#20) * fix connection arn structure * fix api destination arn structure * black format * release 2.0.3.37 * fix s3 exception tests see botocore/parsers.py:1002 where RequestId is removed from parsed * remove python 2 from build action * add test failure annotations in build action * fix events test arn comparisons * fix s3 encryption response test * return default value "0" if EC2 availableIpAddressCount is empty * fix extracting SecurityGroupIds for EC2 VPC endpoints * support deleting/updating API Gateway DomainNames * fix(events): Return empty string instead of null when no pattern is specified in EventPattern (tf-compat) (#22) * fix logic and revert CF changes to get tests running again (#21) * add support for EC2 customer gateway API (#25) * add support for EC2 Transit Gateway APIs (#24) * feat(logs): add `kmsKeyId` into `LogGroup` entity (#23) * minor change in ELBv2 logic to fix tests * feat(events): add APIs to describe and delete CloudWatch Events connections (#26) * add support for EC2 transit gateway route tables (#27) * pass transit gateway route table ID in Describe API, minor refactoring (#29) * add support for EC2 Transit Gateway Routes (#28) * fix region on ACM certificate import (#31) * add support for EC2 transit gateway attachments (#30) * add support for EC2 Transit Gateway VPN attachments (#32) * fix account ID for logs API * add support for DeleteOrganization API * feat(events): store raw filter representation for CloudWatch events patterns (tf-compat) (#36) * feat(events): add support to describe/update/delete CloudWatch API destinations (#35) * add Cognito UpdateIdentityPool, CW Logs PutResourcePolicy * feat(events): add support for tags in EventBus API (#38) * fix parameter validation for Batch compute environments (tf-compat) * revert merge conflicts in IMPLEMENTATION_COVERAGE.md * format code using black * restore original README; re-enable and fix CloudFormation tests * restore tests and old logic for CF stack parameters from SSM * parameterize RequestId/RequestID in response messages and revert related test changes * undo LocalStack-specific adaptations * minor fix * Update CodeCov config to reflect removal of Py2 * undo change related to CW metric filtering; add additional test for CW metric statistics with dimensions * Terraform - Extend whitelist of running tests Co-authored-by: acsbendi <acsbendi28@gmail.com> Co-authored-by: Phan Duong <duongpv@outlook.com> Co-authored-by: Thomas Rausch <thomas@thrau.at> Co-authored-by: Macwan Nevil <macnev2013@gmail.com> Co-authored-by: Dominik Schubert <dominik.schubert91@gmail.com> Co-authored-by: Gonzalo Saad <saad.gonzalo.ale@gmail.com> Co-authored-by: Mohit Alonja <monty16597@users.noreply.github.com> Co-authored-by: Miguel Gagliardo <migag9@gmail.com> Co-authored-by: Bert Blommers <info@bertblommers.nl>
2021-07-26 14:21:17 +00:00
]
or (
name in inherent_attributes
and compare(inherent_attributes[name](user), value)
)
2020-02-05 13:19:08 +00:00
]
response = {"Users": [user.to_json(extended=True) for user in users]}
if token:
response["PaginationToken"] = str(token)
return json.dumps(response)
2018-05-02 21:13:12 +00:00
def admin_disable_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_disable_user(user_pool_id, username)
return ""
def admin_enable_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_enable_user(user_pool_id, username)
return ""
2018-05-02 21:13:12 +00:00
def admin_delete_user(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_delete_user(user_pool_id, username)
return ""
def admin_initiate_auth(self):
user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId")
auth_flow = self._get_param("AuthFlow")
auth_parameters = self._get_param("AuthParameters")
auth_result = cognitoidp_backends[self.region].admin_initiate_auth(
2019-10-31 15:44:26 +00:00
user_pool_id, client_id, auth_flow, auth_parameters
2018-05-02 21:13:12 +00:00
)
return json.dumps(auth_result)
def respond_to_auth_challenge(self):
session = self._get_param("Session")
client_id = self._get_param("ClientId")
challenge_name = self._get_param("ChallengeName")
challenge_responses = self._get_param("ChallengeResponses")
auth_result = cognitoidp_backends[self.region].respond_to_auth_challenge(
2019-10-31 15:44:26 +00:00
session, client_id, challenge_name, challenge_responses
2018-05-02 21:13:12 +00:00
)
return json.dumps(auth_result)
def forgot_password(self):
client_id = self._get_param("ClientId")
username = self._get_param("Username")
region = find_region_by_value("client_id", client_id)
response = cognitoidp_backends[region].forgot_password(client_id, username)
return json.dumps(response)
2018-05-02 21:13:12 +00:00
# This endpoint receives no authorization header, so if moto-server is listening
# on localhost (doesn't get a region in the host header), it doesn't know what
# region's backend should handle the traffic, and we use `find_region_by_value` to
# solve that problem.
def confirm_forgot_password(self):
client_id = self._get_param("ClientId")
username = self._get_param("Username")
password = self._get_param("Password")
region = find_region_by_value("client_id", client_id)
2019-10-31 15:44:26 +00:00
cognitoidp_backends[region].confirm_forgot_password(
client_id, username, password
)
2018-05-02 21:13:12 +00:00
return ""
# Ditto the comment on confirm_forgot_password.
def change_password(self):
access_token = self._get_param("AccessToken")
previous_password = self._get_param("PreviousPassword")
proposed_password = self._get_param("ProposedPassword")
region = find_region_by_value("access_token", access_token)
2019-10-31 15:44:26 +00:00
cognitoidp_backends[region].change_password(
access_token, previous_password, proposed_password
)
2018-05-02 21:13:12 +00:00
return ""
def admin_update_user_attributes(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
attributes = self._get_param("UserAttributes")
2019-10-31 15:44:26 +00:00
cognitoidp_backends[self.region].admin_update_user_attributes(
user_pool_id, username, attributes
)
return ""
def admin_user_global_sign_out(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
cognitoidp_backends[self.region].admin_user_global_sign_out(
user_pool_id, username
)
return ""
# Resource Server
def create_resource_server(self):
user_pool_id = self._get_param("UserPoolId")
identifier = self._get_param("Identifier")
name = self._get_param("Name")
scopes = self._get_param("Scopes")
resource_server = cognitoidp_backends[self.region].create_resource_server(
user_pool_id, identifier, name, scopes
)
return json.dumps({"ResourceServer": resource_server.to_json()})
def sign_up(self):
client_id = self._get_param("ClientId")
username = self._get_param("Username")
password = self._get_param("Password")
user = cognitoidp_backends[self.region].sign_up(
client_id=client_id,
username=username,
password=password,
attributes=self._get_param("UserAttributes", []),
)
return json.dumps(
{
"UserConfirmed": user.status == UserStatus["CONFIRMED"],
"UserSub": user.id,
}
)
def confirm_sign_up(self):
client_id = self._get_param("ClientId")
username = self._get_param("Username")
confirmation_code = self._get_param("ConfirmationCode")
cognitoidp_backends[self.region].confirm_sign_up(
client_id=client_id, username=username, confirmation_code=confirmation_code,
)
return ""
def initiate_auth(self):
client_id = self._get_param("ClientId")
auth_flow = self._get_param("AuthFlow")
auth_parameters = self._get_param("AuthParameters")
auth_result = cognitoidp_backends[self.region].initiate_auth(
client_id, auth_flow, auth_parameters
)
return json.dumps(auth_result)
def associate_software_token(self):
access_token = self._get_param("AccessToken")
result = cognitoidp_backends[self.region].associate_software_token(access_token)
return json.dumps(result)
def verify_software_token(self):
access_token = self._get_param("AccessToken")
user_code = self._get_param("UserCode")
result = cognitoidp_backends[self.region].verify_software_token(
access_token, user_code
)
return json.dumps(result)
def set_user_mfa_preference(self):
access_token = self._get_param("AccessToken")
software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings")
sms_mfa_settings = self._get_param("SMSMfaSettings")
cognitoidp_backends[self.region].set_user_mfa_preference(
access_token, software_token_mfa_settings, sms_mfa_settings
)
return ""
def admin_set_user_password(self):
user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username")
password = self._get_param("Password")
permanent = self._get_param("Permanent")
cognitoidp_backends[self.region].admin_set_user_password(
user_pool_id, username, password, permanent
)
return ""
2018-05-02 21:13:12 +00:00
class CognitoIdpJsonWebKeyResponse(BaseResponse):
def __init__(self):
2019-10-31 15:44:26 +00:00
with open(
os.path.join(os.path.dirname(__file__), "resources/jwks-public.json")
) as f:
2018-05-02 21:13:12 +00:00
self.json_web_key = f.read()
def serve_json_web_key(self, request, full_url, headers):
return 200, {"Content-Type": "application/json"}, self.json_web_key