diff --git a/moto/cognitoidentity/exceptions.py b/moto/cognitoidentity/exceptions.py index 2a83ffd69..ed8135efc 100644 --- a/moto/cognitoidentity/exceptions.py +++ b/moto/cognitoidentity/exceptions.py @@ -2,7 +2,7 @@ from moto.core.exceptions import JsonRESTError class ResourceNotFoundError(JsonRESTError): - def __init__(self, message): + def __init__(self, message: str): super().__init__(error_type="ResourceNotFoundException", message=message) @@ -10,6 +10,6 @@ class InvalidNameException(JsonRESTError): message = "1 validation error detected: Value '{}' at 'identityPoolName' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w\\s+=,.@-]+" - def __init__(self, name): + def __init__(self, name: str): msg = InvalidNameException.message.format(name) super().__init__(error_type="ValidationException", message=msg) diff --git a/moto/cognitoidentity/models.py b/moto/cognitoidentity/models.py index 10c974779..f9f2f0d99 100644 --- a/moto/cognitoidentity/models.py +++ b/moto/cognitoidentity/models.py @@ -3,14 +3,15 @@ import json import re from collections import OrderedDict +from typing import Any, Dict, List, Optional from moto.core import BaseBackend, BaseModel from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict from .exceptions import InvalidNameException, ResourceNotFoundError from .utils import get_random_identity_id -class CognitoIdentity(BaseModel): - def __init__(self, region, identity_pool_name, **kwargs): +class CognitoIdentityPool(BaseModel): + def __init__(self, region: str, identity_pool_name: str, **kwargs: Any): self.identity_pool_name = identity_pool_name if not re.fullmatch(r"[\w\s+=,.@-]+", identity_pool_name): @@ -32,7 +33,7 @@ class CognitoIdentity(BaseModel): self.tags = kwargs.get("tags") or {} - def to_json(self): + def to_json(self) -> str: return json.dumps( { "IdentityPoolId": self.identity_pool_id, @@ -43,50 +44,37 @@ class CognitoIdentity(BaseModel): "OpenIdConnectProviderARNs": self.open_id_connect_provider_arns, "CognitoIdentityProviders": self.cognito_identity_providers, "SamlProviderARNs": self.saml_provider_arns, + "IdentityPoolTags": self.tags, } ) class CognitoIdentityBackend(BaseBackend): - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.identity_pools = OrderedDict() - self.pools_identities = {} + self.identity_pools: Dict[str, CognitoIdentityPool] = OrderedDict() + self.pools_identities: Dict[str, Dict[str, Any]] = {} - def describe_identity_pool(self, identity_pool_id): + def describe_identity_pool(self, identity_pool_id: str) -> str: identity_pool = self.identity_pools.get(identity_pool_id, None) if not identity_pool: raise ResourceNotFoundError(identity_pool_id) - response = json.dumps( - { - "AllowUnauthenticatedIdentities": identity_pool.allow_unauthenticated_identities, - "CognitoIdentityProviders": identity_pool.cognito_identity_providers, - "DeveloperProviderName": identity_pool.developer_provider_name, - "IdentityPoolId": identity_pool.identity_pool_id, - "IdentityPoolName": identity_pool.identity_pool_name, - "IdentityPoolTags": identity_pool.tags, - "OpenIdConnectProviderARNs": identity_pool.open_id_connect_provider_arns, - "SamlProviderARNs": identity_pool.saml_provider_arns, - "SupportedLoginProviders": identity_pool.supported_login_providers, - } - ) - - return response + return identity_pool.to_json() def create_identity_pool( self, - identity_pool_name, - allow_unauthenticated_identities, - supported_login_providers, - developer_provider_name, - open_id_connect_provider_arns, - cognito_identity_providers, - saml_provider_arns, - tags=None, - ): - new_identity = CognitoIdentity( + identity_pool_name: str, + allow_unauthenticated_identities: bool, + supported_login_providers: Dict[str, str], + developer_provider_name: str, + open_id_connect_provider_arns: List[str], + cognito_identity_providers: List[Dict[str, Any]], + saml_provider_arns: List[str], + tags: Dict[str, str], + ) -> str: + new_identity = CognitoIdentityPool( self.region_name, identity_pool_name, allow_unauthenticated_identities=allow_unauthenticated_identities, @@ -106,21 +94,20 @@ class CognitoIdentityBackend(BaseBackend): } } ) - response = new_identity.to_json() - return response + return new_identity.to_json() def update_identity_pool( self, - identity_pool_id, - identity_pool_name, - allow_unauthenticated, - login_providers, - provider_name, - provider_arns, - identity_providers, - saml_providers, - tags=None, - ): + identity_pool_id: str, + identity_pool_name: str, + allow_unauthenticated: Optional[bool], + login_providers: Optional[Dict[str, str]], + provider_name: Optional[str], + provider_arns: Optional[List[str]], + identity_providers: Optional[List[Dict[str, Any]]], + saml_providers: Optional[List[str]], + tags: Optional[Dict[str, str]], + ) -> str: """ The AllowClassic-parameter has not yet been implemented """ @@ -141,20 +128,19 @@ class CognitoIdentityBackend(BaseBackend): if tags: pool.tags = tags - response = pool.to_json() - return response + return pool.to_json() - def get_id(self, identity_pool_id: str): + def get_id(self, identity_pool_id: str) -> str: identity_id = {"IdentityId": get_random_identity_id(self.region_name)} self.pools_identities[identity_pool_id]["Identities"].append(identity_id) return json.dumps(identity_id) - def get_credentials_for_identity(self, identity_id): + def get_credentials_for_identity(self, identity_id: str) -> str: duration = 90 now = datetime.datetime.utcnow() expiration = now + datetime.timedelta(seconds=duration) expiration_str = str(iso_8601_datetime_with_milliseconds(expiration)) - response = json.dumps( + return json.dumps( { "Credentials": { "AccessKeyId": "TESTACCESSKEY12345", @@ -165,32 +151,28 @@ class CognitoIdentityBackend(BaseBackend): "IdentityId": identity_id, } ) - return response - def get_open_id_token_for_developer_identity(self, identity_id): - response = json.dumps( + def get_open_id_token_for_developer_identity(self, identity_id: str) -> str: + return json.dumps( { "IdentityId": identity_id, "Token": get_random_identity_id(self.region_name), } ) - return response - def get_open_id_token(self, identity_id): - response = json.dumps( + def get_open_id_token(self, identity_id: str) -> str: + return json.dumps( { "IdentityId": identity_id, "Token": get_random_identity_id(self.region_name), } ) - return response - def list_identities(self, identity_pool_id): + def list_identities(self, identity_pool_id: str) -> str: """ The MaxResults-parameter has not yet been implemented """ - response = json.dumps(self.pools_identities[identity_pool_id]) - return response + return json.dumps(self.pools_identities[identity_pool_id]) cognitoidentity_backends = BackendDict(CognitoIdentityBackend, "cognito-identity") diff --git a/moto/cognitoidentity/responses.py b/moto/cognitoidentity/responses.py index 97f4d1d5e..34499ed02 100644 --- a/moto/cognitoidentity/responses.py +++ b/moto/cognitoidentity/responses.py @@ -1,17 +1,17 @@ from moto.core.responses import BaseResponse -from .models import cognitoidentity_backends +from .models import cognitoidentity_backends, CognitoIdentityBackend from .utils import get_random_identity_id class CognitoIdentityResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="cognito-identity") @property - def backend(self): + def backend(self) -> CognitoIdentityBackend: return cognitoidentity_backends[self.current_account][self.region] - def create_identity_pool(self): + def create_identity_pool(self) -> str: identity_pool_name = self._get_param("IdentityPoolName") allow_unauthenticated_identities = self._get_param( "AllowUnauthenticatedIdentities" @@ -34,7 +34,7 @@ class CognitoIdentityResponse(BaseResponse): tags=pool_tags, ) - def update_identity_pool(self): + def update_identity_pool(self) -> str: pool_id = self._get_param("IdentityPoolId") pool_name = self._get_param("IdentityPoolName") allow_unauthenticated = self._get_bool_param("AllowUnauthenticatedIdentities") @@ -57,26 +57,26 @@ class CognitoIdentityResponse(BaseResponse): tags=pool_tags, ) - def get_id(self): + def get_id(self) -> str: return self.backend.get_id(identity_pool_id=self._get_param("IdentityPoolId")) - def describe_identity_pool(self): + def describe_identity_pool(self) -> str: return self.backend.describe_identity_pool(self._get_param("IdentityPoolId")) - def get_credentials_for_identity(self): + def get_credentials_for_identity(self) -> str: return self.backend.get_credentials_for_identity(self._get_param("IdentityId")) - def get_open_id_token_for_developer_identity(self): + def get_open_id_token_for_developer_identity(self) -> str: return self.backend.get_open_id_token_for_developer_identity( self._get_param("IdentityId") or get_random_identity_id(self.region) ) - def get_open_id_token(self): + def get_open_id_token(self) -> str: return self.backend.get_open_id_token( self._get_param("IdentityId") or get_random_identity_id(self.region) ) - def list_identities(self): + def list_identities(self) -> str: return self.backend.list_identities( self._get_param("IdentityPoolId") or get_random_identity_id(self.region) ) diff --git a/moto/cognitoidentity/utils.py b/moto/cognitoidentity/utils.py index 34a1798d1..8ec0ff3e3 100644 --- a/moto/cognitoidentity/utils.py +++ b/moto/cognitoidentity/utils.py @@ -1,5 +1,5 @@ from moto.moto_api._internal import mock_random -def get_random_identity_id(region): +def get_random_identity_id(region: str) -> str: return "{0}:{1}".format(region, mock_random.uuid4()) diff --git a/moto/cognitoidp/exceptions.py b/moto/cognitoidp/exceptions.py index 62d32b50e..9bcef7fb1 100644 --- a/moto/cognitoidp/exceptions.py +++ b/moto/cognitoidp/exceptions.py @@ -1,43 +1,44 @@ from moto.core.exceptions import JsonRESTError +from typing import Optional class ResourceNotFoundError(JsonRESTError): - def __init__(self, message): - super().__init__(error_type="ResourceNotFoundException", message=message) + def __init__(self, message: Optional[str]): + super().__init__(error_type="ResourceNotFoundException", message=message or "") class UserNotFoundError(JsonRESTError): - def __init__(self, message): + def __init__(self, message: str): super().__init__(error_type="UserNotFoundException", message=message) class UsernameExistsException(JsonRESTError): - def __init__(self, message): + def __init__(self, message: str): super().__init__(error_type="UsernameExistsException", message=message) class GroupExistsException(JsonRESTError): - def __init__(self, message): + def __init__(self, message: str): super().__init__(error_type="GroupExistsException", message=message) class NotAuthorizedError(JsonRESTError): - def __init__(self, message): - super().__init__(error_type="NotAuthorizedException", message=message) + def __init__(self, message: Optional[str]): + super().__init__(error_type="NotAuthorizedException", message=message or "") class UserNotConfirmedException(JsonRESTError): - def __init__(self, message): + def __init__(self, message: str): super().__init__(error_type="UserNotConfirmedException", message=message) class ExpiredCodeException(JsonRESTError): - def __init__(self, message): + def __init__(self, message: str): super().__init__(error_type="ExpiredCodeException", message=message) class InvalidParameterException(JsonRESTError): - def __init__(self, msg=None): + def __init__(self, msg: Optional[str] = None): self.code = 400 super().__init__( "InvalidParameterException", msg or "A parameter is specified incorrectly." diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index d49965e98..b7ee41e64 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -6,6 +6,7 @@ import typing import enum from jose import jws from collections import OrderedDict +from typing import Any, Dict, List, Tuple, Optional, Set from moto.core import BaseBackend, BaseModel from moto.core.utils import BackendDict from moto.moto_api._internal import mock_random as random @@ -51,7 +52,7 @@ class AuthFlow(str, enum.Enum): USER_PASSWORD_AUTH = "USER_PASSWORD_AUTH" @classmethod - def list(cls): + def list(cls) -> List[str]: return [e.value for e in cls] @@ -180,7 +181,7 @@ class CognitoIdpUserPoolAttribute(BaseModel): ATTRIBUTE_DATA_TYPES = {"Boolean", "DateTime", "String", "Number"} - def __init__(self, name, custom, schema): + def __init__(self, name: str, custom: bool, schema: Dict[str, Any]): self.name = name self.custom = custom attribute_data_type = schema.get("AttributeDataType", None) @@ -198,7 +199,7 @@ class CognitoIdpUserPoolAttribute(BaseModel): else: self._init_standard(schema) - def _init_custom(self, schema): + def _init_custom(self, schema: Dict[str, Any]) -> None: self.name = "custom:" + self.name attribute_data_type = schema.get("AttributeDataType", None) if not attribute_data_type: @@ -217,7 +218,7 @@ class CognitoIdpUserPoolAttribute(BaseModel): self.required = False self._init_constraints(schema, None, show_empty_constraints=True) - def _init_standard(self, schema): + def _init_standard(self, schema: Dict[str, Any]) -> None: attribute_data_type = schema.get("AttributeDataType", None) default_attribute_data_type = CognitoIdpUserPoolAttribute.STANDARD_SCHEMA[ self.name @@ -253,11 +254,14 @@ class CognitoIdpUserPoolAttribute(BaseModel): self._init_constraints(schema, default_constraints) def _init_constraints( - self, schema, default_constraints, show_empty_constraints=False - ): - def numeric_limit(num, constraint_type): + self, + schema: Dict[str, Any], + default_constraints: Any, + show_empty_constraints: bool = False, + ) -> None: + def numeric_limit(num: Optional[str], constraint_type: str) -> Optional[int]: if not num: - return + return # type: ignore[return-value] parsed = None try: parsed = int(num) @@ -269,7 +273,9 @@ class CognitoIdpUserPoolAttribute(BaseModel): ) return parsed - self.string_constraints = {} if show_empty_constraints else None + self.string_constraints: Optional[Dict[str, Any]] = ( + {} if show_empty_constraints else None + ) self.number_constraints = None if "AttributeDataType" in schema: @@ -323,7 +329,7 @@ class CognitoIdpUserPoolAttribute(BaseModel): self.number_constraints = None self.string_constraints = None - def to_json(self): + def to_json(self) -> Dict[str, Any]: return { "Name": self.name, "AttributeDataType": self.data_type, @@ -335,7 +341,7 @@ class CognitoIdpUserPoolAttribute(BaseModel): } -DEFAULT_USER_POOL_CONFIG = { +DEFAULT_USER_POOL_CONFIG: Dict[str, Any] = { "Policies": { "PasswordPolicy": { "MinimumLength": 8, @@ -369,7 +375,9 @@ class CognitoIdpUserPool(BaseModel): MAX_ID_LENGTH = 56 - def __init__(self, account_id, region, name, extended_config): + def __init__( + self, account_id: str, region: str, name: str, extended_config: Dict[str, Any] + ): self.account_id = account_id self.region = region @@ -403,8 +411,8 @@ class CognitoIdpUserPool(BaseModel): self.last_modified_date = datetime.datetime.utcnow() self.mfa_config = "OFF" - self.sms_mfa_config = None - self.token_mfa_config = None + self.sms_mfa_config: Optional[Dict[str, Any]] = None + self.token_mfa_config: Optional[Dict[str, bool]] = None self.schema_attributes = {} for schema in self.extended_config.pop("Schema", {}): @@ -426,14 +434,14 @@ class CognitoIdpUserPool(BaseModel): standard_attribute_name, False, standard_attribute_schema ) - self.clients = OrderedDict() - self.identity_providers = OrderedDict() - self.groups = OrderedDict() - self.users = OrderedDict() - self.resource_servers = OrderedDict() - self.refresh_tokens = {} - self.access_tokens = {} - self.id_tokens = {} + self.clients: Dict[str, CognitoIdpUserPoolClient] = OrderedDict() + self.identity_providers: Dict[str, CognitoIdpIdentityProvider] = OrderedDict() + self.groups: Dict[str, CognitoIdpGroup] = OrderedDict() + self.users: Dict[str, CognitoIdpUser] = OrderedDict() + self.resource_servers: Dict[str, CognitoResourceServer] = OrderedDict() + self.refresh_tokens: Dict[str, Optional[Tuple[str, str]]] = {} + self.access_tokens: Dict[str, Tuple[str, str]] = {} + self.id_tokens: Dict[str, Tuple[str, str]] = {} with open( os.path.join(os.path.dirname(__file__), "resources/jwks-private.json") @@ -441,11 +449,11 @@ class CognitoIdpUserPool(BaseModel): self.json_web_key = json.loads(f.read()) @property - def backend(self): + def backend(self) -> "CognitoIdpBackend": return cognitoidp_backends[self.account_id][self.region] @property - def domain(self): + def domain(self) -> Optional["CognitoIdpUserPoolDomain"]: return next( ( upd @@ -455,7 +463,7 @@ class CognitoIdpUserPool(BaseModel): None, ) - def _account_recovery_setting(self): + def _account_recovery_setting(self) -> Any: # AccountRecoverySetting is not present in DescribeUserPool response if the pool was created without # specifying it, ForgotPassword works on default settings nonetheless return self.extended_config.get( @@ -468,7 +476,7 @@ class CognitoIdpUserPool(BaseModel): }, ) - def _base_json(self): + def _base_json(self) -> Dict[str, Any]: return { "Id": self.id, "Arn": self.arn, @@ -480,7 +488,7 @@ class CognitoIdpUserPool(BaseModel): "EstimatedNumberOfUsers": len(self.users), } - def to_json(self, extended=False): + def to_json(self, extended: bool = False) -> Dict[str, Any]: user_pool_json = self._base_json() if extended: user_pool_json.update(self.extended_config) @@ -499,7 +507,7 @@ class CognitoIdpUserPool(BaseModel): user_pool_json["Domain"] = self.domain.domain return user_pool_json - def _get_user(self, username): + def _get_user(self, username: str) -> "CognitoIdpUser": """Find a user within a user pool by Username or any UsernameAttributes (`email` or `phone_number` or both)""" if self.extended_config.get("UsernameAttributes"): @@ -511,11 +519,16 @@ class CognitoIdpUserPool(BaseModel): ]: return user - return self.users.get(username) + return self.users.get(username) # type: ignore[return-value] def create_jwt( - self, client_id, username, token_use, expires_in=60 * 60, extra_data=None - ): + self, + client_id: str, + username: str, + token_use: str, + expires_in: int = 60 * 60, + extra_data: Optional[Dict[str, Any]] = None, + ) -> Tuple[str, int]: now = int(time.time()) payload = { "iss": "https://cognito-idp.{}.amazonaws.com/{}".format( @@ -536,7 +549,7 @@ class CognitoIdpUserPool(BaseModel): expires_in, ) - def add_custom_attributes(self, custom_attributes): + def add_custom_attributes(self, custom_attributes: List[Dict[str, str]]) -> None: attributes = [] for attribute_schema in custom_attributes: base_name = attribute_schema["Name"] @@ -552,7 +565,7 @@ class CognitoIdpUserPool(BaseModel): for attribute in attributes: self.schema_attributes[attribute.name] = attribute - def create_id_token(self, client_id, username): + def create_id_token(self, client_id: str, username: str) -> Tuple[str, int]: extra_data = self.get_user_extra_data_by_client_id(client_id, username) id_token, expires_in = self.create_jwt( client_id, username, "id", extra_data=extra_data @@ -560,12 +573,12 @@ class CognitoIdpUserPool(BaseModel): self.id_tokens[id_token] = (client_id, username) return id_token, expires_in - def create_refresh_token(self, client_id, username): + def create_refresh_token(self, client_id: str, username: str) -> str: refresh_token = str(random.uuid4()) self.refresh_tokens[refresh_token] = (client_id, username) return refresh_token - def create_access_token(self, client_id, username): + def create_access_token(self, client_id: str, username: str) -> Tuple[str, int]: extra_data = {} user = self._get_user(username) if len(user.groups) > 0: @@ -577,10 +590,13 @@ class CognitoIdpUserPool(BaseModel): self.access_tokens[access_token] = (client_id, username) return access_token, expires_in - def create_tokens_from_refresh_token(self, refresh_token): - if self.refresh_tokens.get(refresh_token) is None: + def create_tokens_from_refresh_token( + self, refresh_token: str + ) -> Tuple[str, str, int]: + res = self.refresh_tokens[refresh_token] + if res is None: raise NotAuthorizedError(refresh_token) - client_id, username = self.refresh_tokens.get(refresh_token) + client_id, username = res if not username: raise NotAuthorizedError(refresh_token) @@ -588,7 +604,9 @@ class CognitoIdpUserPool(BaseModel): id_token, _ = self.create_id_token(client_id, username) return access_token, id_token, expires_in - def get_user_extra_data_by_client_id(self, client_id, username): + def get_user_extra_data_by_client_id( + self, client_id: str, username: str + ) -> Dict[str, Any]: extra_data = {} current_client = self.clients.get(client_id, None) if current_client: @@ -603,7 +621,7 @@ class CognitoIdpUserPool(BaseModel): extra_data.update({attribute[0]["Name"]: attribute[0]["Value"]}) return extra_data - def sign_out(self, username): + def sign_out(self, username: str) -> None: for token, token_tuple in list(self.refresh_tokens.items()): if token_tuple is None: continue @@ -613,12 +631,17 @@ class CognitoIdpUserPool(BaseModel): class CognitoIdpUserPoolDomain(BaseModel): - def __init__(self, user_pool_id, domain, custom_domain_config=None): + def __init__( + self, + user_pool_id: str, + domain: str, + custom_domain_config: Optional[Dict[str, Any]] = None, + ): self.user_pool_id = user_pool_id self.domain = domain self.custom_domain_config = custom_domain_config or {} - def _distribution_name(self): + def _distribution_name(self) -> str: if self.custom_domain_config and "CertificateArn" in self.custom_domain_config: unique_hash = md5_hash( self.custom_domain_config["CertificateArn"].encode("utf-8") @@ -627,7 +650,7 @@ class CognitoIdpUserPoolDomain(BaseModel): unique_hash = md5_hash(self.user_pool_id.encode("utf-8")).hexdigest() return f"{unique_hash[:16]}.amazoncognito.com" - def to_json(self, extended=True): + def to_json(self, extended: bool = True) -> Dict[str, Any]: distribution = self._distribution_name() if extended: return { @@ -639,27 +662,31 @@ class CognitoIdpUserPoolDomain(BaseModel): "Status": "ACTIVE", "Version": None, } - elif distribution: + else: return {"CloudFrontDomain": distribution} - return None class CognitoIdpUserPoolClient(BaseModel): - def __init__(self, user_pool_id, generate_secret, extended_config): + def __init__( + self, + user_pool_id: str, + generate_secret: bool, + extended_config: Optional[Dict[str, Any]], + ): self.user_pool_id = user_pool_id self.id = create_id() self.secret = str(random.uuid4()) self.generate_secret = generate_secret or False self.extended_config = extended_config or {} - def _base_json(self): + def _base_json(self) -> Dict[str, Any]: return { "ClientId": self.id, "ClientName": self.extended_config.get("ClientName"), "UserPoolId": self.user_pool_id, } - def to_json(self, extended=False): + def to_json(self, extended: bool = False) -> Dict[str, Any]: user_pool_client_json = self._base_json() if self.generate_secret: user_pool_client_json.update({"ClientSecret": self.secret}) @@ -668,12 +695,12 @@ class CognitoIdpUserPoolClient(BaseModel): return user_pool_client_json - def get_readable_fields(self): + def get_readable_fields(self) -> List[str]: return self.extended_config.get("ReadAttributes", []) class CognitoIdpIdentityProvider(BaseModel): - def __init__(self, name, extended_config): + def __init__(self, name: str, extended_config: Optional[Dict[str, Any]]): self.name = name self.extended_config = extended_config or {} self.creation_date = datetime.datetime.utcnow() @@ -682,7 +709,7 @@ class CognitoIdpIdentityProvider(BaseModel): if "AttributeMapping" not in self.extended_config: self.extended_config["AttributeMapping"] = {"username": "sub"} - def _base_json(self): + def _base_json(self) -> Dict[str, Any]: return { "ProviderName": self.name, "ProviderType": self.extended_config.get("ProviderType"), @@ -690,7 +717,7 @@ class CognitoIdpIdentityProvider(BaseModel): "LastModifiedDate": time.mktime(self.last_modified_date.timetuple()), } - def to_json(self, extended=False): + def to_json(self, extended: bool = False) -> Dict[str, Any]: identity_provider_json = self._base_json() if extended: identity_provider_json.update(self.extended_config) @@ -699,7 +726,14 @@ class CognitoIdpIdentityProvider(BaseModel): class CognitoIdpGroup(BaseModel): - def __init__(self, user_pool_id, group_name, description, role_arn, precedence): + def __init__( + self, + user_pool_id: str, + group_name: str, + description: str, + role_arn: str, + precedence: int, + ): self.user_pool_id = user_pool_id self.group_name = group_name self.description = description or "" @@ -710,9 +744,14 @@ class CognitoIdpGroup(BaseModel): # Users who are members of this group. # Note that these links are bidirectional. - self.users = set() + self.users: Set[CognitoIdpUser] = set() - def update(self, description, role_arn, precedence): + def update( + self, + description: Optional[str], + role_arn: Optional[str], + precedence: Optional[int], + ) -> None: if description is not None: self.description = description if role_arn is not None: @@ -721,7 +760,7 @@ class CognitoIdpGroup(BaseModel): self.precedence = precedence self.last_modified_date = datetime.datetime.now() - def to_json(self): + def to_json(self) -> Dict[str, Any]: return { "GroupName": self.group_name, "UserPoolId": self.user_pool_id, @@ -734,7 +773,14 @@ class CognitoIdpGroup(BaseModel): class CognitoIdpUser(BaseModel): - def __init__(self, user_pool_id, username, password, status, attributes): + def __init__( + self, + user_pool_id: str, + username: Optional[str], + password: Optional[str], + status: str, + attributes: List[Dict[str, str]], + ): self.id = str(random.uuid4()) self.user_pool_id = user_pool_id # Username is None when users sign up with an email or phone_number, @@ -750,16 +796,16 @@ class CognitoIdpUser(BaseModel): self.sms_mfa_enabled = False self.software_token_mfa_enabled = False self.token_verified = False - self.confirmation_code = None - self.preferred_mfa_setting = None + self.confirmation_code: Optional[str] = None + self.preferred_mfa_setting: Optional[str] = None # Groups this user is a member of. # Note that these links are bidirectional. - self.groups = set() + self.groups: Set[CognitoIdpGroup] = set() self.update_attributes([{"Name": "sub", "Value": self.id}]) - def _base_json(self): + def _base_json(self) -> Dict[str, Any]: return { "UserPoolId": self.user_pool_id, "Username": self.username, @@ -770,8 +816,11 @@ class CognitoIdpUser(BaseModel): # list_users brings back "Attributes" while admin_get_user brings back "UserAttributes". def to_json( - self, extended=False, attributes_key="Attributes", attributes_to_get=None - ): + self, + extended: bool = False, + attributes_key: str = "Attributes", + attributes_to_get: Optional[List[str]] = None, + ) -> Dict[str, Any]: user_mfa_setting_list = [] if self.software_token_mfa_enabled: user_mfa_setting_list.append("SOFTWARE_TOKEN_MFA") @@ -796,13 +845,13 @@ class CognitoIdpUser(BaseModel): return user_json - def update_attributes(self, new_attributes): + def update_attributes(self, new_attributes: List[Dict[str, Any]]) -> None: flat_attributes = flatten_attrs(self.attributes) flat_attributes.update(flatten_attrs(new_attributes)) self.attribute_lookup = flat_attributes self.attributes = expand_attrs(flat_attributes) - def delete_attributes(self, attrs_to_delete): + def delete_attributes(self, attrs_to_delete: List[str]) -> None: flat_attributes = flatten_attrs(self.attributes) wrong_attrs = [] for attr in attrs_to_delete: @@ -826,14 +875,20 @@ class CognitoIdpUser(BaseModel): class CognitoResourceServer(BaseModel): - def __init__(self, user_pool_id, identifier, name, scopes): + def __init__( + self, + user_pool_id: str, + identifier: str, + name: str, + scopes: List[Dict[str, str]], + ): self.user_pool_id = user_pool_id self.identifier = identifier self.name = name self.scopes = scopes - def to_json(self): - res = { + def to_json(self) -> Dict[str, Any]: + res: Dict[str, Any] = { "UserPoolId": self.user_pool_id, "Identifier": self.identifier, "Name": self.name, @@ -853,14 +908,16 @@ class CognitoIdpBackend(BaseBackend): This behavior can be enabled by passing the environment variable: MOTO_COGNITO_IDP_USER_POOL_ID_STRATEGY=HASH. """ - def __init__(self, region_name, account_id): + def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) - self.user_pools = OrderedDict() - self.user_pool_domains = OrderedDict() - self.sessions = {} + self.user_pools: Dict[str, CognitoIdpUserPool] = OrderedDict() + self.user_pool_domains: Dict[str, CognitoIdpUserPoolDomain] = OrderedDict() + self.sessions: Dict[str, CognitoIdpUserPool] = {} # User pool - def create_user_pool(self, name, extended_config): + def create_user_pool( + self, name: str, extended_config: Dict[str, Any] + ) -> CognitoIdpUserPool: user_pool = CognitoIdpUserPool( self.account_id, self.region_name, name, extended_config ) @@ -868,8 +925,12 @@ class CognitoIdpBackend(BaseBackend): return user_pool def set_user_pool_mfa_config( - self, user_pool_id, sms_config, token_config, mfa_config - ): + self, + user_pool_id: str, + sms_config: Dict[str, Any], + token_config: Dict[str, bool], + mfa_config: str, + ) -> Dict[str, Any]: user_pool = self.describe_user_pool(user_pool_id) user_pool.mfa_config = mfa_config user_pool.sms_mfa_config = sms_config @@ -877,7 +938,7 @@ class CognitoIdpBackend(BaseBackend): return self.get_user_pool_mfa_config(user_pool_id) - def get_user_pool_mfa_config(self, user_pool_id): + def get_user_pool_mfa_config(self, user_pool_id: str) -> Dict[str, Any]: user_pool = self.describe_user_pool(user_pool_id) return { @@ -887,27 +948,34 @@ class CognitoIdpBackend(BaseBackend): } @paginate(pagination_model=PAGINATION_MODEL) - def list_user_pools(self): + def list_user_pools(self) -> List[CognitoIdpUserPool]: # type: ignore[misc] return list(self.user_pools.values()) - def describe_user_pool(self, user_pool_id): + def describe_user_pool(self, user_pool_id: str) -> CognitoIdpUserPool: user_pool = self.user_pools.get(user_pool_id) if not user_pool: raise ResourceNotFoundError(f"User pool {user_pool_id} does not exist.") return user_pool - def update_user_pool(self, user_pool_id, extended_config): + def update_user_pool( + self, user_pool_id: str, extended_config: Dict[str, Any] + ) -> None: user_pool = self.describe_user_pool(user_pool_id) user_pool.extended_config = extended_config - def delete_user_pool(self, user_pool_id): + def delete_user_pool(self, user_pool_id: str) -> None: self.describe_user_pool(user_pool_id) del self.user_pools[user_pool_id] # User pool domain - def create_user_pool_domain(self, user_pool_id, domain, custom_domain_config=None): + def create_user_pool_domain( + self, + user_pool_id: str, + domain: str, + custom_domain_config: Optional[Dict[str, str]] = None, + ) -> CognitoIdpUserPoolDomain: self.describe_user_pool(user_pool_id) user_pool_domain = CognitoIdpUserPoolDomain( @@ -916,19 +984,23 @@ class CognitoIdpBackend(BaseBackend): self.user_pool_domains[domain] = user_pool_domain return user_pool_domain - def describe_user_pool_domain(self, domain): + def describe_user_pool_domain( + self, domain: str + ) -> Optional[CognitoIdpUserPoolDomain]: if domain not in self.user_pool_domains: return None return self.user_pool_domains[domain] - def delete_user_pool_domain(self, domain): + def delete_user_pool_domain(self, domain: str) -> None: if domain not in self.user_pool_domains: raise ResourceNotFoundError(domain) del self.user_pool_domains[domain] - def update_user_pool_domain(self, domain, custom_domain_config): + def update_user_pool_domain( + self, domain: str, custom_domain_config: Dict[str, str] + ) -> CognitoIdpUserPoolDomain: if domain not in self.user_pool_domains: raise ResourceNotFoundError(domain) @@ -937,7 +1009,9 @@ class CognitoIdpBackend(BaseBackend): return user_pool_domain # User pool client - def create_user_pool_client(self, user_pool_id, generate_secret, extended_config): + def create_user_pool_client( + self, user_pool_id: str, generate_secret: bool, extended_config: Dict[str, str] + ) -> CognitoIdpUserPoolClient: user_pool = self.describe_user_pool(user_pool_id) user_pool_client = CognitoIdpUserPoolClient( @@ -947,12 +1021,14 @@ class CognitoIdpBackend(BaseBackend): return user_pool_client @paginate(pagination_model=PAGINATION_MODEL) - def list_user_pool_clients(self, user_pool_id): + def list_user_pool_clients(self, user_pool_id: str) -> List[CognitoIdpUserPoolClient]: # type: ignore[misc] user_pool = self.describe_user_pool(user_pool_id) return list(user_pool.clients.values()) - def describe_user_pool_client(self, user_pool_id, client_id): + def describe_user_pool_client( + self, user_pool_id: str, client_id: str + ) -> CognitoIdpUserPoolClient: user_pool = self.describe_user_pool(user_pool_id) client = user_pool.clients.get(client_id) @@ -961,7 +1037,9 @@ class CognitoIdpBackend(BaseBackend): return client - def update_user_pool_client(self, user_pool_id, client_id, extended_config): + def update_user_pool_client( + self, user_pool_id: str, client_id: str, extended_config: Dict[str, str] + ) -> CognitoIdpUserPoolClient: user_pool = self.describe_user_pool(user_pool_id) client = user_pool.clients.get(client_id) @@ -971,7 +1049,7 @@ class CognitoIdpBackend(BaseBackend): client.extended_config.update(extended_config) return client - def delete_user_pool_client(self, user_pool_id, client_id): + def delete_user_pool_client(self, user_pool_id: str, client_id: str) -> None: user_pool = self.describe_user_pool(user_pool_id) if client_id not in user_pool.clients: @@ -980,7 +1058,9 @@ class CognitoIdpBackend(BaseBackend): del user_pool.clients[client_id] # Identity provider - def create_identity_provider(self, user_pool_id, name, extended_config): + def create_identity_provider( + self, user_pool_id: str, name: str, extended_config: Dict[str, str] + ) -> CognitoIdpIdentityProvider: user_pool = self.describe_user_pool(user_pool_id) identity_provider = CognitoIdpIdentityProvider(name, extended_config) @@ -988,12 +1068,14 @@ class CognitoIdpBackend(BaseBackend): return identity_provider @paginate(pagination_model=PAGINATION_MODEL) - def list_identity_providers(self, user_pool_id): + def list_identity_providers(self, user_pool_id: str) -> List[CognitoIdpIdentityProvider]: # type: ignore[misc] user_pool = self.describe_user_pool(user_pool_id) return list(user_pool.identity_providers.values()) - def describe_identity_provider(self, user_pool_id, name): + def describe_identity_provider( + self, user_pool_id: str, name: str + ) -> CognitoIdpIdentityProvider: user_pool = self.describe_user_pool(user_pool_id) identity_provider = user_pool.identity_providers.get(name) @@ -1002,7 +1084,9 @@ class CognitoIdpBackend(BaseBackend): return identity_provider - def update_identity_provider(self, user_pool_id, name, extended_config): + def update_identity_provider( + self, user_pool_id: str, name: str, extended_config: Dict[str, str] + ) -> CognitoIdpIdentityProvider: user_pool = self.describe_user_pool(user_pool_id) identity_provider = user_pool.identity_providers.get(name) @@ -1013,7 +1097,7 @@ class CognitoIdpBackend(BaseBackend): return identity_provider - def delete_identity_provider(self, user_pool_id, name): + def delete_identity_provider(self, user_pool_id: str, name: str) -> None: user_pool = self.describe_user_pool(user_pool_id) if name not in user_pool.identity_providers: @@ -1022,7 +1106,14 @@ class CognitoIdpBackend(BaseBackend): del user_pool.identity_providers[name] # Group - def create_group(self, user_pool_id, group_name, description, role_arn, precedence): + def create_group( + self, + user_pool_id: str, + group_name: str, + description: str, + role_arn: str, + precedence: int, + ) -> CognitoIdpGroup: user_pool = self.describe_user_pool(user_pool_id) group = CognitoIdpGroup( @@ -1034,7 +1125,7 @@ class CognitoIdpBackend(BaseBackend): return group - def get_group(self, user_pool_id, group_name): + def get_group(self, user_pool_id: str, group_name: str) -> CognitoIdpGroup: user_pool = self.describe_user_pool(user_pool_id) if group_name not in user_pool.groups: @@ -1043,12 +1134,12 @@ class CognitoIdpBackend(BaseBackend): return user_pool.groups[group_name] @paginate(pagination_model=PAGINATION_MODEL) - def list_groups(self, user_pool_id): + def list_groups(self, user_pool_id: str) -> List[CognitoIdpGroup]: # type: ignore[misc] user_pool = self.describe_user_pool(user_pool_id) return list(user_pool.groups.values()) - def delete_group(self, user_pool_id, group_name): + def delete_group(self, user_pool_id: str, group_name: str) -> None: user_pool = self.describe_user_pool(user_pool_id) if group_name not in user_pool.groups: @@ -1060,14 +1151,23 @@ class CognitoIdpBackend(BaseBackend): del user_pool.groups[group_name] - def update_group(self, user_pool_id, group_name, description, role_arn, precedence): + def update_group( + self, + user_pool_id: str, + group_name: str, + description: str, + role_arn: str, + precedence: int, + ) -> CognitoIdpGroup: group = self.get_group(user_pool_id, group_name) group.update(description, role_arn, precedence) return group - def admin_add_user_to_group(self, user_pool_id, group_name, username): + def admin_add_user_to_group( + self, user_pool_id: str, group_name: str, username: str + ) -> None: group = self.get_group(user_pool_id, group_name) user = self.admin_get_user(user_pool_id, username) @@ -1075,23 +1175,27 @@ class CognitoIdpBackend(BaseBackend): user.groups.add(group) @paginate(pagination_model=PAGINATION_MODEL) - def list_users_in_group(self, user_pool_id, group_name): + def list_users_in_group(self, user_pool_id: str, group_name: str) -> List[CognitoIdpUser]: # type: ignore[misc] user_pool = self.describe_user_pool(user_pool_id) group = self.get_group(user_pool_id, group_name) return list(filter(lambda user: user in group.users, user_pool.users.values())) - def admin_list_groups_for_user(self, user_pool_id, username): + def admin_list_groups_for_user( + self, user_pool_id: str, username: str + ) -> List[CognitoIdpGroup]: user = self.admin_get_user(user_pool_id, username) return list(user.groups) - def admin_remove_user_from_group(self, user_pool_id, group_name, username): + def admin_remove_user_from_group( + self, user_pool_id: str, group_name: str, username: str + ) -> None: group = self.get_group(user_pool_id, group_name) user = self.admin_get_user(user_pool_id, username) group.users.discard(user) user.groups.discard(group) - def admin_reset_user_password(self, user_pool_id, username): + def admin_reset_user_password(self, user_pool_id: str, username: str) -> None: user = self.admin_get_user(user_pool_id, username) if not user.enabled: raise NotAuthorizedError("User is disabled") @@ -1112,8 +1216,13 @@ class CognitoIdpBackend(BaseBackend): # User def admin_create_user( - self, user_pool_id, username, message_action, temporary_password, attributes - ): + self, + user_pool_id: str, + username: str, + message_action: str, + temporary_password: str, + attributes: List[Dict[str, str]], + ) -> CognitoIdpUser: user_pool = self.describe_user_pool(user_pool_id) if message_action and message_action == "RESEND": @@ -1133,7 +1242,8 @@ class CognitoIdpBackend(BaseBackend): # `AccessToken` # # ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings - if user_pool.extended_config.get("UsernameAttributes"): + has_username_attrs = user_pool.extended_config.get("UsernameAttributes") + if has_username_attrs: username_attributes = user_pool.extended_config["UsernameAttributes"] # attribute_type should be one of `email`, `phone_number` or both for attribute_type in username_attributes: @@ -1144,12 +1254,10 @@ class CognitoIdpBackend(BaseBackend): ): # insert provided username into new user's attributes under the # correct key - flattened_attrs = flatten_attrs(attributes or {}) + flattened_attrs = flatten_attrs(attributes or []) flattened_attrs.update({attribute_type: username}) attributes = expand_attrs(flattened_attrs) - # set username to None so that it will be default to the internal GUID - # when them user gets created - username = None + # once the username has been validated against a username attribute # type, there is no need to attempt validation against the other # type(s) @@ -1157,14 +1265,16 @@ class CognitoIdpBackend(BaseBackend): # The provided username has not matched the required format for any # of the possible attributes - if username is not None: + else: raise InvalidParameterException( "Username should be either an email or a phone number." ) user = CognitoIdpUser( user_pool_id, - username, + # set username to None so that it will be default to the internal GUID + # when them user gets created + None if has_username_attrs else username, temporary_password, UserStatus.FORCE_CHANGE_PASSWORD, attributes, @@ -1173,12 +1283,12 @@ class CognitoIdpBackend(BaseBackend): user_pool.users[user.username] = user return user - def admin_confirm_sign_up(self, user_pool_id, username): + def admin_confirm_sign_up(self, user_pool_id: str, username: str) -> str: user = self.admin_get_user(user_pool_id, username) user.status = UserStatus["CONFIRMED"] return "" - def admin_get_user(self, user_pool_id, username): + def admin_get_user(self, user_pool_id: str, username: str) -> CognitoIdpUser: user_pool = self.describe_user_pool(user_pool_id) user = user_pool._get_user(username) @@ -1186,7 +1296,7 @@ class CognitoIdpBackend(BaseBackend): raise UserNotFoundError("User does not exist.") return user - def get_user(self, access_token): + def get_user(self, access_token: str) -> CognitoIdpUser: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] @@ -1201,20 +1311,20 @@ class CognitoIdpBackend(BaseBackend): raise NotAuthorizedError("Invalid token") @paginate(pagination_model=PAGINATION_MODEL) - def list_users(self, user_pool_id): + def list_users(self, user_pool_id: str) -> List[CognitoIdpUser]: # type: ignore[misc] user_pool = self.describe_user_pool(user_pool_id) return list(user_pool.users.values()) - def admin_disable_user(self, user_pool_id, username): + def admin_disable_user(self, user_pool_id: str, username: str) -> None: user = self.admin_get_user(user_pool_id, username) user.enabled = False - def admin_enable_user(self, user_pool_id, username): + def admin_enable_user(self, user_pool_id: str, username: str) -> None: user = self.admin_get_user(user_pool_id, username) user.enabled = True - def admin_delete_user(self, user_pool_id, username): + def admin_delete_user(self, user_pool_id: str, username: str) -> None: user_pool = self.describe_user_pool(user_pool_id) user = self.admin_get_user(user_pool_id, username) @@ -1224,7 +1334,12 @@ class CognitoIdpBackend(BaseBackend): # use internal username del user_pool.users[user.username] - def _log_user_in(self, user_pool, client, username): + def _log_user_in( + self, + user_pool: CognitoIdpUserPool, + client: CognitoIdpUserPoolClient, + username: str, + ) -> Dict[str, Dict[str, Any]]: refresh_token = user_pool.create_refresh_token(client.id, username) access_token, id_token, expires_in = user_pool.create_tokens_from_refresh_token( refresh_token @@ -1259,7 +1374,13 @@ class CognitoIdpBackend(BaseBackend): return auth_flow - def admin_initiate_auth(self, user_pool_id, client_id, auth_flow, auth_parameters): + def admin_initiate_auth( + self, + user_pool_id: str, + client_id: str, + auth_flow: str, + auth_parameters: Dict[str, str], + ) -> Dict[str, Any]: admin_auth_flows = [ AuthFlow.ADMIN_NO_SRP_AUTH, AuthFlow.ADMIN_USER_PASSWORD_AUTH, @@ -1277,8 +1398,8 @@ class CognitoIdpBackend(BaseBackend): raise ResourceNotFoundError(client_id) if auth_flow in (AuthFlow.ADMIN_USER_PASSWORD_AUTH, AuthFlow.ADMIN_NO_SRP_AUTH): - username = auth_parameters.get("USERNAME") - password = auth_parameters.get("PASSWORD") + username: str = auth_parameters.get("USERNAME") # type: ignore[assignment] + password: str = auth_parameters.get("PASSWORD") # type: ignore[assignment] user = self.admin_get_user(user_pool_id, username) if user.password != password: @@ -1299,7 +1420,7 @@ class CognitoIdpBackend(BaseBackend): return self._log_user_in(user_pool, client, username) elif auth_flow is AuthFlow.REFRESH_TOKEN: - refresh_token = auth_parameters.get("REFRESH_TOKEN") + refresh_token: str = auth_parameters.get("REFRESH_TOKEN") # type: ignore[assignment] ( access_token, id_token, @@ -1316,13 +1437,17 @@ class CognitoIdpBackend(BaseBackend): } else: # We shouldn't get here due to enum validation of auth_flow - return None + return None # type: ignore[return-value] def respond_to_auth_challenge( - self, session, client_id, challenge_name, challenge_responses - ): + self, + session: str, + client_id: str, + challenge_name: str, + challenge_responses: Dict[str, str], + ) -> Dict[str, Any]: if challenge_name == "PASSWORD_VERIFIER": - session = challenge_responses.get("PASSWORD_CLAIM_SECRET_BLOCK") + session = challenge_responses.get("PASSWORD_CLAIM_SECRET_BLOCK") # type: ignore[assignment] user_pool = self.sessions.get(session) if not user_pool: @@ -1333,7 +1458,7 @@ class CognitoIdpBackend(BaseBackend): raise ResourceNotFoundError(client_id) if challenge_name == "NEW_PASSWORD_REQUIRED": - username = challenge_responses.get("USERNAME") + username: str = challenge_responses.get("USERNAME") # type: ignore[assignment] new_password = challenge_responses.get("NEW_PASSWORD") user = self.admin_get_user(user_pool.id, username) @@ -1343,7 +1468,7 @@ class CognitoIdpBackend(BaseBackend): return self._log_user_in(user_pool, client, username) elif challenge_name == "PASSWORD_VERIFIER": - username = challenge_responses.get("USERNAME") + username: str = challenge_responses.get("USERNAME") # type: ignore[no-redef] user = self.admin_get_user(user_pool.id, username) password_claim_signature = challenge_responses.get( @@ -1377,7 +1502,7 @@ class CognitoIdpBackend(BaseBackend): del self.sessions[session] return self._log_user_in(user_pool, client, username) elif challenge_name == "SOFTWARE_TOKEN_MFA": - username = challenge_responses.get("USERNAME") + username: str = challenge_responses.get("USERNAME") # type: ignore[no-redef] self.admin_get_user(user_pool.id, username) software_token_mfa_code = challenge_responses.get("SOFTWARE_TOKEN_MFA_CODE") @@ -1397,7 +1522,9 @@ class CognitoIdpBackend(BaseBackend): else: return {} - def confirm_forgot_password(self, client_id, username, password, confirmation_code): + def confirm_forgot_password( + self, client_id: str, username: str, password: str, confirmation_code: str + ) -> None: for user_pool in self.user_pools.values(): if client_id in user_pool.clients and user_pool._get_user(username): user = user_pool._get_user(username) @@ -1414,7 +1541,9 @@ class CognitoIdpBackend(BaseBackend): else: raise ResourceNotFoundError(client_id) - def forgot_password(self, client_id, username): + def forgot_password( + self, client_id: str, username: str + ) -> Tuple[Optional[str], Dict[str, Any]]: """ The ForgotPassword operation is partially broken in AWS. If the input is 100% correct it works fine. @@ -1432,7 +1561,7 @@ class CognitoIdpBackend(BaseBackend): else: raise ResourceNotFoundError("Username/client id combination not found.") - confirmation_code = None + confirmation_code: Optional[str] = None if user: # An unfortunate bit of magic - confirmation_code is opt-in, as it's returned # via a "x-moto-forgot-password-confirmation-code" http header, which is not the AWS way (should be SES, SNS, Cognito built-in email) @@ -1466,7 +1595,9 @@ class CognitoIdpBackend(BaseBackend): } return confirmation_code, {"CodeDeliveryDetails": code_delivery_details} - def change_password(self, access_token, previous_password, proposed_password): + def change_password( + self, access_token: str, previous_password: str, proposed_password: str + ) -> None: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] @@ -1486,21 +1617,25 @@ class CognitoIdpBackend(BaseBackend): else: raise NotAuthorizedError(access_token) - def admin_update_user_attributes(self, user_pool_id, username, attributes): + def admin_update_user_attributes( + self, user_pool_id: str, username: str, attributes: List[Dict[str, str]] + ) -> None: user = self.admin_get_user(user_pool_id, username) user.update_attributes(attributes) - def admin_delete_user_attributes(self, user_pool_id, username, attributes): + def admin_delete_user_attributes( + self, user_pool_id: str, username: str, attributes: List[str] + ) -> None: self.admin_get_user(user_pool_id, username).delete_attributes(attributes) - def admin_user_global_sign_out(self, user_pool_id, username): + def admin_user_global_sign_out(self, user_pool_id: str, username: str) -> None: user_pool = self.describe_user_pool(user_pool_id) self.admin_get_user(user_pool_id, username) user_pool.sign_out(username) - def global_sign_out(self, access_token): + def global_sign_out(self, access_token: str) -> None: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] @@ -1509,7 +1644,13 @@ class CognitoIdpBackend(BaseBackend): raise NotAuthorizedError(access_token) - def create_resource_server(self, user_pool_id, identifier, name, scopes): + def create_resource_server( + self, + user_pool_id: str, + identifier: str, + name: str, + scopes: List[Dict[str, str]], + ) -> CognitoResourceServer: user_pool = self.describe_user_pool(user_pool_id) if identifier in user_pool.resource_servers: @@ -1521,7 +1662,13 @@ class CognitoIdpBackend(BaseBackend): user_pool.resource_servers[identifier] = resource_server return resource_server - def sign_up(self, client_id, username, password, attributes): + def sign_up( + self, + client_id: str, + username: str, + password: str, + attributes: List[Dict[str, str]], + ) -> CognitoIdpUser: user_pool = None for p in self.user_pools.values(): if client_id in p.clients: @@ -1543,7 +1690,8 @@ class CognitoIdpBackend(BaseBackend): # `AccessToken` # # ref: https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-settings-attributes.html#user-pool-settings-aliases-settings - if user_pool.extended_config.get("UsernameAttributes"): + has_username_attrs = user_pool.extended_config.get("UsernameAttributes") + if has_username_attrs: username_attributes = user_pool.extended_config["UsernameAttributes"] # attribute_type should be one of `email`, `phone_number` or both for attribute_type in username_attributes: @@ -1554,27 +1702,27 @@ class CognitoIdpBackend(BaseBackend): ): # insert provided username into new user's attributes under the # correct key - flattened_attrs = flatten_attrs(attributes or {}) + flattened_attrs = flatten_attrs(attributes or []) flattened_attrs.update({attribute_type: username}) attributes = expand_attrs(flattened_attrs) - # set username to None so that it will be default to the internal GUID - # when them user gets created - username = None + # once the username has been validated against a username attribute # type, there is no need to attempt validation against the other # type(s) break - # The provided username has not matched the required format for any - # of the possible attributes - if username is not None: + else: + # The provided username has not matched the required format for any + # of the possible attributes raise InvalidParameterException( "Username should be either an email or a phone number." ) user = CognitoIdpUser( user_pool_id=user_pool.id, - username=username, + # set username to None so that it will be default to the internal GUID + # when them user gets created + username=None if has_username_attrs else username, password=password, attributes=attributes, status=UserStatus.UNCONFIRMED, @@ -1582,7 +1730,7 @@ class CognitoIdpBackend(BaseBackend): user_pool.users[user.username] = user return user - def confirm_sign_up(self, client_id, username): + def confirm_sign_up(self, client_id: str, username: str) -> str: user_pool = None for p in self.user_pools.values(): if client_id in p.clients: @@ -1595,7 +1743,9 @@ class CognitoIdpBackend(BaseBackend): user.status = UserStatus.CONFIRMED return "" - def initiate_auth(self, client_id, auth_flow, auth_parameters): + def initiate_auth( + self, client_id: str, auth_flow: str, auth_parameters: Dict[str, str] + ) -> Dict[str, Any]: user_auth_flows = [ AuthFlow.USER_SRP_AUTH, AuthFlow.REFRESH_TOKEN_AUTH, @@ -1608,28 +1758,28 @@ class CognitoIdpBackend(BaseBackend): auth_flow=auth_flow, valid_flows=user_auth_flows ) - user_pool = None - client = None + user_pool: Optional[CognitoIdpUserPool] = None + client: CognitoIdpUserPoolClient = None # type: ignore[assignment] for p in self.user_pools.values(): if client_id in p.clients: user_pool = p - client = p.clients.get(client_id) + client = p.clients[client_id] if user_pool is None: raise ResourceNotFoundError(client_id) if auth_flow is AuthFlow.USER_SRP_AUTH: - username = auth_parameters.get("USERNAME") + username: str = auth_parameters.get("USERNAME") # type: ignore[assignment] srp_a = auth_parameters.get("SRP_A") if not srp_a: raise ResourceNotFoundError(srp_a) if client.generate_secret: - secret_hash = auth_parameters.get("SECRET_HASH") + secret_hash: str = auth_parameters.get("SECRET_HASH") # type: ignore[assignment] if not check_secret_hash( - client.secret, client.id, username, secret_hash + client.secret, client.id, username, secret_hash # type: ignore[arg-type] ): - raise NotAuthorizedError(secret_hash) + raise NotAuthorizedError(secret_hash) # type: ignore[arg-type] - user = self.admin_get_user(user_pool.id, username) + user = self.admin_get_user(user_pool.id, username) # type: ignore[arg-type] if user.status is UserStatus.UNCONFIRMED: raise UserNotConfirmedException("User is not confirmed.") @@ -1649,8 +1799,8 @@ class CognitoIdpBackend(BaseBackend): }, } elif auth_flow is AuthFlow.USER_PASSWORD_AUTH: - username = auth_parameters.get("USERNAME") - password = auth_parameters.get("PASSWORD") + username: str = auth_parameters.get("USERNAME") # type: ignore[no-redef] + password: str = auth_parameters.get("PASSWORD") # type: ignore[assignment] user = self.admin_get_user(user_pool.id, username) @@ -1677,14 +1827,14 @@ class CognitoIdpBackend(BaseBackend): client_id, username ) id_token, _ = user_pool.create_id_token(client_id, username) - refresh_token = user_pool.create_refresh_token(client_id, username) + new_refresh_token = user_pool.create_refresh_token(client_id, username) return { "AuthenticationResult": { "IdToken": id_token, "AccessToken": access_token, "ExpiresIn": expires_in, - "RefreshToken": refresh_token, + "RefreshToken": new_refresh_token, "TokenType": "Bearer", } } @@ -1693,15 +1843,16 @@ class CognitoIdpBackend(BaseBackend): if not refresh_token: raise ResourceNotFoundError(refresh_token) - if user_pool.refresh_tokens[refresh_token] is None: + res = user_pool.refresh_tokens[refresh_token] + if res is None: raise NotAuthorizedError("Refresh Token has been revoked") - client_id, username = user_pool.refresh_tokens[refresh_token] + client_id, username = res if not username: raise ResourceNotFoundError(username) if client.generate_secret: - secret_hash = auth_parameters.get("SECRET_HASH") + secret_hash: str = auth_parameters.get("SECRET_HASH") # type: ignore[no-redef] if not check_secret_hash( client.secret, client.id, username, secret_hash ): @@ -1723,9 +1874,9 @@ class CognitoIdpBackend(BaseBackend): } else: # We shouldn't get here due to enum validation of auth_flow - return None + return None # type: ignore[return-value] - def associate_software_token(self, access_token): + def associate_software_token(self, access_token: str) -> Dict[str, str]: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] @@ -1735,7 +1886,7 @@ class CognitoIdpBackend(BaseBackend): raise NotAuthorizedError(access_token) - def verify_software_token(self, access_token): + def verify_software_token(self, access_token: str) -> Dict[str, str]: """ The parameter UserCode has not yet been implemented """ @@ -1751,8 +1902,11 @@ class CognitoIdpBackend(BaseBackend): raise NotAuthorizedError(access_token) def set_user_mfa_preference( - self, access_token, software_token_mfa_settings, sms_mfa_settings - ): + self, + access_token: str, + software_token_mfa_settings: Dict[str, bool], + sms_mfa_settings: Dict[str, bool], + ) -> None: for user_pool in self.user_pools.values(): if access_token in user_pool.access_tokens: _, username = user_pool.access_tokens[access_token] @@ -1780,8 +1934,12 @@ class CognitoIdpBackend(BaseBackend): raise NotAuthorizedError(access_token) def admin_set_user_mfa_preference( - self, user_pool_id, username, software_token_mfa_settings, sms_mfa_settings - ): + self, + user_pool_id: str, + username: str, + software_token_mfa_settings: Dict[str, bool], + sms_mfa_settings: Dict[str, bool], + ) -> None: user = self.admin_get_user(user_pool_id, username) if software_token_mfa_settings and software_token_mfa_settings.get("Enabled"): @@ -1801,7 +1959,9 @@ class CognitoIdpBackend(BaseBackend): user.preferred_mfa_setting = "SMS_MFA" return None - def admin_set_user_password(self, user_pool_id, username, password, permanent): + def admin_set_user_password( + self, user_pool_id: str, username: str, password: str, permanent: bool + ) -> None: user = self.admin_get_user(user_pool_id, username) user.password = password if permanent: @@ -1809,11 +1969,15 @@ class CognitoIdpBackend(BaseBackend): else: user.status = UserStatus.FORCE_CHANGE_PASSWORD - def add_custom_attributes(self, user_pool_id, custom_attributes): + def add_custom_attributes( + self, user_pool_id: str, custom_attributes: List[Dict[str, Any]] + ) -> None: user_pool = self.describe_user_pool(user_pool_id) user_pool.add_custom_attributes(custom_attributes) - def update_user_attributes(self, access_token, attributes): + def update_user_attributes( + self, access_token: str, attributes: List[Dict[str, str]] + ) -> None: """ The parameter ClientMetadata has not yet been implemented. No CodeDeliveryDetails are returned. """ @@ -1833,7 +1997,7 @@ class RegionAgnosticBackend: # Without authentication-header, we lose the context of which region the request was send to # This backend will cycle through all backends as a workaround - def _find_backend_by_access_token(self, access_token): + def _find_backend_by_access_token(self, access_token: str) -> CognitoIdpBackend: for account_specific_backends in cognitoidp_backends.values(): for region, backend in account_specific_backends.items(): if region == "global": @@ -1843,7 +2007,7 @@ class RegionAgnosticBackend: return backend return backend - def _find_backend_for_clientid(self, client_id): + def _find_backend_for_clientid(self, client_id: str) -> CognitoIdpBackend: for account_specific_backends in cognitoidp_backends.values(): for region, backend in account_specific_backends.items(): if region == "global": @@ -1853,25 +2017,37 @@ class RegionAgnosticBackend: return backend return backend - def sign_up(self, client_id, username, password, attributes): + def sign_up( + self, + client_id: str, + username: str, + password: str, + attributes: List[Dict[str, str]], + ) -> CognitoIdpUser: backend = self._find_backend_for_clientid(client_id) return backend.sign_up(client_id, username, password, attributes) - def initiate_auth(self, client_id, auth_flow, auth_parameters): + def initiate_auth( + self, client_id: str, auth_flow: str, auth_parameters: Dict[str, str] + ) -> Dict[str, Any]: backend = self._find_backend_for_clientid(client_id) return backend.initiate_auth(client_id, auth_flow, auth_parameters) - def confirm_sign_up(self, client_id, username): + def confirm_sign_up(self, client_id: str, username: str) -> str: backend = self._find_backend_for_clientid(client_id) return backend.confirm_sign_up(client_id, username) - def get_user(self, access_token): + def get_user(self, access_token: str) -> CognitoIdpUser: backend = self._find_backend_by_access_token(access_token) return backend.get_user(access_token) def respond_to_auth_challenge( - self, session, client_id, challenge_name, challenge_responses - ): + self, + session: str, + client_id: str, + challenge_name: str, + challenge_responses: Dict[str, str], + ) -> Dict[str, Any]: backend = self._find_backend_for_clientid(client_id) return backend.respond_to_auth_challenge( session, client_id, challenge_name, challenge_responses @@ -1884,7 +2060,7 @@ cognitoidp_backends = BackendDict(CognitoIdpBackend, "cognito-idp") # Hack to help moto-server process requests on localhost, where the region isn't # specified in the host header. Some endpoints (change password, confirm forgot # password) have no authorization header from which to extract the region. -def find_account_region_by_value(key, value): +def find_account_region_by_value(key: str, value: str) -> Tuple[str, str]: for account_id, account_specific_backend in cognitoidp_backends.items(): for region, backend in account_specific_backend.items(): for user_pool in backend.user_pools.values(): diff --git a/moto/cognitoidp/responses.py b/moto/cognitoidp/responses.py index a748f3db4..cb767d7ea 100644 --- a/moto/cognitoidp/responses.py +++ b/moto/cognitoidp/responses.py @@ -1,6 +1,7 @@ import json import os import re +from typing import Any, Dict, Tuple from moto.core.responses import BaseResponse from .models import ( @@ -8,6 +9,7 @@ from .models import ( find_account_region_by_value, RegionAgnosticBackend, UserStatus, + CognitoIdpBackend, ) from .exceptions import InvalidParameterException @@ -16,24 +18,24 @@ region_agnostic_backend = RegionAgnosticBackend() class CognitoIdpResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: super().__init__(service_name="cognito-idp") @property - def parameters(self): + def parameters(self) -> Dict[str, Any]: # type: ignore[misc] return json.loads(self.body) @property - def backend(self): + def backend(self) -> CognitoIdpBackend: return cognitoidp_backends[self.current_account][self.region] # User pool - def create_user_pool(self): + def create_user_pool(self) -> str: name = self.parameters.pop("PoolName") user_pool = self.backend.create_user_pool(name, self.parameters) return json.dumps({"UserPool": user_pool.to_json(extended=True)}) - def set_user_pool_mfa_config(self): + def set_user_pool_mfa_config(self) -> str: user_pool_id = self._get_param("UserPoolId") sms_config = self._get_param("SmsMfaConfiguration", None) token_config = self._get_param("SoftwareTokenMfaConfiguration", None) @@ -60,38 +62,40 @@ class CognitoIdpResponse(BaseResponse): ) return json.dumps(response) - def get_user_pool_mfa_config(self): + def get_user_pool_mfa_config(self) -> str: user_pool_id = self._get_param("UserPoolId") response = self.backend.get_user_pool_mfa_config(user_pool_id) return json.dumps(response) - def list_user_pools(self): + def list_user_pools(self) -> str: max_results = self._get_param("MaxResults") next_token = self._get_param("NextToken") user_pools, next_token = self.backend.list_user_pools( max_results=max_results, next_token=next_token ) - response = {"UserPools": [user_pool.to_json() for user_pool in user_pools]} + response: Dict[str, Any] = { + "UserPools": [user_pool.to_json() for user_pool in user_pools] + } if next_token: response["NextToken"] = str(next_token) return json.dumps(response) - def describe_user_pool(self): + def describe_user_pool(self) -> str: user_pool_id = self._get_param("UserPoolId") user_pool = self.backend.describe_user_pool(user_pool_id) return json.dumps({"UserPool": user_pool.to_json(extended=True)}) - def update_user_pool(self): + def update_user_pool(self) -> None: user_pool_id = self._get_param("UserPoolId") self.backend.update_user_pool(user_pool_id, self.parameters) - def delete_user_pool(self): + def delete_user_pool(self) -> str: user_pool_id = self._get_param("UserPoolId") self.backend.delete_user_pool(user_pool_id) return "" # User pool domain - def create_user_pool_domain(self): + def create_user_pool_domain(self) -> str: domain = self._get_param("Domain") user_pool_id = self._get_param("UserPoolId") custom_domain_config = self._get_param("CustomDomainConfig") @@ -103,21 +107,21 @@ class CognitoIdpResponse(BaseResponse): return json.dumps(domain_description) return "" - def describe_user_pool_domain(self): + def describe_user_pool_domain(self) -> str: domain = self._get_param("Domain") user_pool_domain = self.backend.describe_user_pool_domain(domain) - domain_description = {} + domain_description: Dict[str, Any] = {} if user_pool_domain: domain_description = user_pool_domain.to_json() return json.dumps({"DomainDescription": domain_description}) - def delete_user_pool_domain(self): + def delete_user_pool_domain(self) -> str: domain = self._get_param("Domain") self.backend.delete_user_pool_domain(domain) return "" - def update_user_pool_domain(self): + def update_user_pool_domain(self) -> str: domain = self._get_param("Domain") custom_domain_config = self._get_param("CustomDomainConfig") user_pool_domain = self.backend.update_user_pool_domain( @@ -129,7 +133,7 @@ class CognitoIdpResponse(BaseResponse): return "" # User pool client - def create_user_pool_client(self): + def create_user_pool_client(self) -> str: user_pool_id = self.parameters.pop("UserPoolId") generate_secret = self.parameters.pop("GenerateSecret", False) user_pool_client = self.backend.create_user_pool_client( @@ -137,14 +141,14 @@ class CognitoIdpResponse(BaseResponse): ) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) - def list_user_pool_clients(self): + def list_user_pool_clients(self) -> str: user_pool_id = self._get_param("UserPoolId") max_results = self._get_param("MaxResults") next_token = self._get_param("NextToken") user_pool_clients, next_token = self.backend.list_user_pool_clients( user_pool_id, max_results=max_results, next_token=next_token ) - response = { + response: Dict[str, Any] = { "UserPoolClients": [ user_pool_client.to_json() for user_pool_client in user_pool_clients ] @@ -153,7 +157,7 @@ class CognitoIdpResponse(BaseResponse): response["NextToken"] = str(next_token) return json.dumps(response) - def describe_user_pool_client(self): + def describe_user_pool_client(self) -> str: user_pool_id = self._get_param("UserPoolId") client_id = self._get_param("ClientId") user_pool_client = self.backend.describe_user_pool_client( @@ -161,7 +165,7 @@ class CognitoIdpResponse(BaseResponse): ) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) - def update_user_pool_client(self): + def update_user_pool_client(self) -> str: user_pool_id = self.parameters.pop("UserPoolId") client_id = self.parameters.pop("ClientId") user_pool_client = self.backend.update_user_pool_client( @@ -169,14 +173,14 @@ class CognitoIdpResponse(BaseResponse): ) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) - def delete_user_pool_client(self): + def delete_user_pool_client(self) -> str: user_pool_id = self._get_param("UserPoolId") client_id = self._get_param("ClientId") self.backend.delete_user_pool_client(user_pool_id, client_id) return "" # Identity provider - def create_identity_provider(self): + def create_identity_provider(self) -> str: user_pool_id = self._get_param("UserPoolId") name = self.parameters.pop("ProviderName") identity_provider = self.backend.create_identity_provider( @@ -186,14 +190,14 @@ class CognitoIdpResponse(BaseResponse): {"IdentityProvider": identity_provider.to_json(extended=True)} ) - def list_identity_providers(self): + def list_identity_providers(self) -> str: user_pool_id = self._get_param("UserPoolId") max_results = self._get_param("MaxResults") next_token = self._get_param("NextToken") identity_providers, next_token = self.backend.list_identity_providers( user_pool_id, max_results=max_results, next_token=next_token ) - response = { + response: Dict[str, Any] = { "Providers": [ identity_provider.to_json() for identity_provider in identity_providers ] @@ -202,7 +206,7 @@ class CognitoIdpResponse(BaseResponse): response["NextToken"] = str(next_token) return json.dumps(response) - def describe_identity_provider(self): + def describe_identity_provider(self) -> str: user_pool_id = self._get_param("UserPoolId") name = self._get_param("ProviderName") identity_provider = self.backend.describe_identity_provider(user_pool_id, name) @@ -210,7 +214,7 @@ class CognitoIdpResponse(BaseResponse): {"IdentityProvider": identity_provider.to_json(extended=True)} ) - def update_identity_provider(self): + def update_identity_provider(self) -> str: user_pool_id = self._get_param("UserPoolId") name = self._get_param("ProviderName") identity_provider = self.backend.update_identity_provider( @@ -220,14 +224,14 @@ class CognitoIdpResponse(BaseResponse): {"IdentityProvider": identity_provider.to_json(extended=True)} ) - def delete_identity_provider(self): + def delete_identity_provider(self) -> str: user_pool_id = self._get_param("UserPoolId") name = self._get_param("ProviderName") self.backend.delete_identity_provider(user_pool_id, name) return "" # Group - def create_group(self): + def create_group(self) -> str: group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") description = self._get_param("Description") @@ -240,13 +244,13 @@ class CognitoIdpResponse(BaseResponse): return json.dumps({"Group": group.to_json()}) - def get_group(self): + def get_group(self) -> str: group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") group = self.backend.get_group(user_pool_id, group_name) return json.dumps({"Group": group.to_json()}) - def list_groups(self): + def list_groups(self) -> str: user_pool_id = self._get_param("UserPoolId") limit = self._get_param("Limit") token = self._get_param("NextToken") @@ -258,13 +262,13 @@ class CognitoIdpResponse(BaseResponse): response["NextToken"] = token return json.dumps(response) - def delete_group(self): + def delete_group(self) -> str: group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") self.backend.delete_group(user_pool_id, group_name) return "" - def update_group(self): + def update_group(self) -> str: group_name = self._get_param("GroupName") user_pool_id = self._get_param("UserPoolId") description = self._get_param("Description") @@ -277,7 +281,7 @@ class CognitoIdpResponse(BaseResponse): return json.dumps({"Group": group.to_json()}) - def admin_add_user_to_group(self): + def admin_add_user_to_group(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") group_name = self._get_param("GroupName") @@ -286,7 +290,7 @@ class CognitoIdpResponse(BaseResponse): return "" - def list_users_in_group(self): + def list_users_in_group(self) -> str: user_pool_id = self._get_param("UserPoolId") group_name = self._get_param("GroupName") limit = self._get_param("Limit") @@ -299,13 +303,13 @@ class CognitoIdpResponse(BaseResponse): response["NextToken"] = token return json.dumps(response) - def admin_list_groups_for_user(self): + def admin_list_groups_for_user(self) -> str: username = self._get_param("Username") user_pool_id = self._get_param("UserPoolId") groups = self.backend.admin_list_groups_for_user(user_pool_id, username) return json.dumps({"Groups": [group.to_json() for group in groups]}) - def admin_remove_user_from_group(self): + def admin_remove_user_from_group(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") group_name = self._get_param("GroupName") @@ -314,14 +318,14 @@ class CognitoIdpResponse(BaseResponse): return "" - def admin_reset_user_password(self): + def admin_reset_user_password(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_reset_user_password(user_pool_id, username) return "" # User - def admin_create_user(self): + def admin_create_user(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") message_action = self._get_param("MessageAction") @@ -336,23 +340,23 @@ class CognitoIdpResponse(BaseResponse): return json.dumps({"User": user.to_json(extended=True)}) - def admin_confirm_sign_up(self): + def admin_confirm_sign_up(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") return self.backend.admin_confirm_sign_up(user_pool_id, username) - def admin_get_user(self): + def admin_get_user(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") user = self.backend.admin_get_user(user_pool_id, username) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) - def get_user(self): + def get_user(self) -> str: access_token = self._get_param("AccessToken") user = region_agnostic_backend.get_user(access_token=access_token) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) - def list_users(self): + def list_users(self) -> str: user_pool_id = self._get_param("UserPoolId") limit = self._get_param("Limit") token = self._get_param("PaginationToken") @@ -362,12 +366,15 @@ class CognitoIdpResponse(BaseResponse): user_pool_id, limit=limit, pagination_token=token ) if filt: - inherent_attributes = { + inherent_attributes: Dict[str, Any] = { "cognito:user_status": lambda u: u.status, "status": lambda u: "Enabled" if u.enabled else "Disabled", "username": lambda u: u.username, } - comparisons = {"=": lambda x, y: x == y, "^=": lambda x, y: x.startswith(y)} + comparisons: Dict[str, Any] = { + "=": lambda x, y: x == y, + "^=": lambda x, y: x.startswith(y), + } allowed_attributes = [ "username", "email", @@ -402,7 +409,7 @@ class CognitoIdpResponse(BaseResponse): and compare(inherent_attributes[name](user), value) ) ] - response = { + response: Dict[str, Any] = { "Users": [ user.to_json(extended=True, attributes_to_get=attributes_to_get) for user in users @@ -412,25 +419,25 @@ class CognitoIdpResponse(BaseResponse): response["PaginationToken"] = str(token) return json.dumps(response) - def admin_disable_user(self): + def admin_disable_user(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_disable_user(user_pool_id, username) return "" - def admin_enable_user(self): + def admin_enable_user(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_enable_user(user_pool_id, username) return "" - def admin_delete_user(self): + def admin_delete_user(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_delete_user(user_pool_id, username) return "" - def admin_initiate_auth(self): + def admin_initiate_auth(self) -> str: user_pool_id = self._get_param("UserPoolId") client_id = self._get_param("ClientId") auth_flow = self._get_param("AuthFlow") @@ -442,7 +449,7 @@ class CognitoIdpResponse(BaseResponse): return json.dumps(auth_result) - def respond_to_auth_challenge(self): + def respond_to_auth_challenge(self) -> str: session = self._get_param("Session") client_id = self._get_param("ClientId") challenge_name = self._get_param("ChallengeName") @@ -453,7 +460,7 @@ class CognitoIdpResponse(BaseResponse): return json.dumps(auth_result) - def forgot_password(self): + def forgot_password(self) -> str: client_id = self._get_param("ClientId") username = self._get_param("Username") account, region = find_account_region_by_value("client_id", client_id) @@ -469,7 +476,7 @@ class CognitoIdpResponse(BaseResponse): # on localhost (doesn't get a region in the host header), it doesn't know what # region's backend should handle the traffic, and we use `find_region_by_value` to # solve that problem. - def confirm_forgot_password(self): + def confirm_forgot_password(self) -> str: client_id = self._get_param("ClientId") username = self._get_param("Username") password = self._get_param("Password") @@ -481,7 +488,7 @@ class CognitoIdpResponse(BaseResponse): return "" # Ditto the comment on confirm_forgot_password. - def change_password(self): + def change_password(self) -> str: access_token = self._get_param("AccessToken") previous_password = self._get_param("PreviousPassword") proposed_password = self._get_param("ProposedPassword") @@ -491,33 +498,33 @@ class CognitoIdpResponse(BaseResponse): ) return "" - def admin_update_user_attributes(self): + def admin_update_user_attributes(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") attributes = self._get_param("UserAttributes") self.backend.admin_update_user_attributes(user_pool_id, username, attributes) return "" - def admin_delete_user_attributes(self): + def admin_delete_user_attributes(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") attributes = self._get_param("UserAttributeNames") self.backend.admin_delete_user_attributes(user_pool_id, username, attributes) return "" - def admin_user_global_sign_out(self): + def admin_user_global_sign_out(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") self.backend.admin_user_global_sign_out(user_pool_id, username) return "" - def global_sign_out(self): + def global_sign_out(self) -> str: access_token = self._get_param("AccessToken") self.backend.global_sign_out(access_token) return "" # Resource Server - def create_resource_server(self): + def create_resource_server(self) -> str: user_pool_id = self._get_param("UserPoolId") identifier = self._get_param("Identifier") name = self._get_param("Name") @@ -527,7 +534,7 @@ class CognitoIdpResponse(BaseResponse): ) return json.dumps({"ResourceServer": resource_server.to_json()}) - def sign_up(self): + def sign_up(self) -> str: client_id = self._get_param("ClientId") username = self._get_param("Username") password = self._get_param("Password") @@ -544,13 +551,13 @@ class CognitoIdpResponse(BaseResponse): } ) - def confirm_sign_up(self): + def confirm_sign_up(self) -> str: client_id = self._get_param("ClientId") username = self._get_param("Username") region_agnostic_backend.confirm_sign_up(client_id=client_id, username=username) return "" - def initiate_auth(self): + def initiate_auth(self) -> str: client_id = self._get_param("ClientId") auth_flow = self._get_param("AuthFlow") auth_parameters = self._get_param("AuthParameters") @@ -561,17 +568,17 @@ class CognitoIdpResponse(BaseResponse): return json.dumps(auth_result) - def associate_software_token(self): + def associate_software_token(self) -> str: access_token = self._get_param("AccessToken") result = self.backend.associate_software_token(access_token) return json.dumps(result) - def verify_software_token(self): + def verify_software_token(self) -> str: access_token = self._get_param("AccessToken") result = self.backend.verify_software_token(access_token) return json.dumps(result) - def set_user_mfa_preference(self): + def set_user_mfa_preference(self) -> str: access_token = self._get_param("AccessToken") software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings") sms_mfa_settings = self._get_param("SMSMfaSettings") @@ -580,7 +587,7 @@ class CognitoIdpResponse(BaseResponse): ) return "" - def admin_set_user_mfa_preference(self): + def admin_set_user_mfa_preference(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings") @@ -590,7 +597,7 @@ class CognitoIdpResponse(BaseResponse): ) return "" - def admin_set_user_password(self): + def admin_set_user_password(self) -> str: user_pool_id = self._get_param("UserPoolId") username = self._get_param("Username") password = self._get_param("Password") @@ -600,13 +607,13 @@ class CognitoIdpResponse(BaseResponse): ) return "" - def add_custom_attributes(self): + def add_custom_attributes(self) -> str: user_pool_id = self._get_param("UserPoolId") custom_attributes = self._get_param("CustomAttributes") self.backend.add_custom_attributes(user_pool_id, custom_attributes) return "" - def update_user_attributes(self): + def update_user_attributes(self) -> str: access_token = self._get_param("AccessToken") attributes = self._get_param("UserAttributes") self.backend.update_user_attributes(access_token, attributes) @@ -614,13 +621,16 @@ class CognitoIdpResponse(BaseResponse): class CognitoIdpJsonWebKeyResponse(BaseResponse): - def __init__(self): + def __init__(self) -> None: with open( os.path.join(os.path.dirname(__file__), "resources/jwks-public.json") ) as f: self.json_web_key = f.read() def serve_json_web_key( - self, request, full_url, headers - ): # pylint: disable=unused-argument + self, + request: Any, # pylint: disable=unused-argument + full_url: str, # pylint: disable=unused-argument + headers: Any, # pylint: disable=unused-argument + ) -> Tuple[int, Dict[str, str], str]: return 200, {"Content-Type": "application/json"}, self.json_web_key diff --git a/moto/cognitoidp/utils.py b/moto/cognitoidp/utils.py index f1a594d27..2a9f8daa9 100644 --- a/moto/cognitoidp/utils.py +++ b/moto/cognitoidp/utils.py @@ -4,6 +4,7 @@ import hmac import base64 import re from moto.moto_api._internal import mock_random as random +from typing import Any, Dict, List, Optional FORMATS = { "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", @@ -51,13 +52,18 @@ PAGINATION_MODEL = { } -def create_id(): +def create_id() -> str: size = 26 chars = list(range(10)) + list(string.ascii_lowercase) return "".join(str(random.choice(chars)) for x in range(size)) -def check_secret_hash(app_client_secret, app_client_id, username, secret_hash): +def check_secret_hash( + app_client_secret: str, + app_client_id: str, + username: str, + secret_hash: Optional[str], +) -> bool: key = bytes(str(app_client_secret).encode("latin-1")) msg = bytes(str(username + app_client_id).encode("latin-1")) new_digest = hmac.new(key, msg, hashlib.sha256).digest() @@ -65,36 +71,36 @@ def check_secret_hash(app_client_secret, app_client_id, username, secret_hash): return SECRET_HASH == secret_hash -def validate_username_format(username, _format="email"): +def validate_username_format(username: str, _format: str = "email") -> bool: # if the value of the `_format` param other than `email` or `phone_number`, # the default value for the regex will match nothing and the # method will return None - return re.fullmatch(FORMATS.get(_format, r"a^"), username) + return re.fullmatch(FORMATS.get(_format, r"a^"), username) is not None -def flatten_attrs(attrs): +def flatten_attrs(attrs: List[Dict[str, Any]]) -> Dict[str, Any]: return {attr["Name"]: attr["Value"] for attr in attrs} -def expand_attrs(attrs): +def expand_attrs(attrs: Dict[str, Any]) -> List[Dict[str, Any]]: return [{"Name": k, "Value": v} for k, v in attrs.items()] ID_HASH_STRATEGY = "HASH" -def generate_id(strategy, *args): +def generate_id(strategy: Optional[str], *args: Any) -> str: if strategy == ID_HASH_STRATEGY: return _generate_id_hash(args) else: return _generate_id_uuid() -def _generate_id_uuid(): +def _generate_id_uuid() -> str: return random.uuid4().hex -def _generate_id_hash(args): +def _generate_id_hash(args: Any) -> str: hasher = hashlib.sha256() for arg in args: diff --git a/moto/settings.py b/moto/settings.py index 9d9a926f5..a7c52f8be 100644 --- a/moto/settings.py +++ b/moto/settings.py @@ -3,6 +3,7 @@ import os import pathlib from functools import lru_cache +from typing import Optional TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true" @@ -130,5 +131,5 @@ def get_docker_host() -> str: return "http://host.docker.internal" -def get_cognito_idp_user_pool_id_strategy(): +def get_cognito_idp_user_pool_id_strategy() -> Optional[str]: return os.environ.get("MOTO_COGNITO_IDP_USER_POOL_ID_STRATEGY") diff --git a/moto/utilities/paginator.py b/moto/utilities/paginator.py index bbcb2dbe7..983bc8ef4 100644 --- a/moto/utilities/paginator.py +++ b/moto/utilities/paginator.py @@ -2,13 +2,16 @@ import inspect from copy import deepcopy from functools import wraps +from typing import Dict, Any, Callable from botocore.paginate import TokenDecoder, TokenEncoder from moto.core.exceptions import InvalidToken -def paginate(pagination_model, original_function=None): +def paginate( + pagination_model: Dict[str, Any], original_function: Callable = None +) -> Callable: def pagination_decorator(func): @wraps(func) def pagination_wrapper(*args, **kwargs): diff --git a/moto/utilities/utils.py b/moto/utilities/utils.py index be6bca8ca..d010daa5e 100644 --- a/moto/utilities/utils.py +++ b/moto/utilities/utils.py @@ -48,7 +48,7 @@ def filter_resources(resources, filters, attr_pairs): return result -def md5_hash(data=None): +def md5_hash(data: Any = None) -> Any: """ MD5-hashing for non-security usecases. Required for Moto to work in FIPS-enabled systems diff --git a/setup.cfg b/setup.cfg index 6638eb37d..dafc3801a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,7 @@ disable = W,C,R,E enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import [mypy] -files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit +files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit,moto/cognito* show_column_numbers=True show_error_codes = True disable_error_code=abstract