Techdebt: MyPy Cognito (#5626)

This commit is contained in:
Bert Blommers 2022-11-01 09:33:01 -01:00 committed by GitHub
parent f185999602
commit 78bf4a82d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 534 additions and 355 deletions

View File

@ -2,7 +2,7 @@ from moto.core.exceptions import JsonRESTError
class ResourceNotFoundError(JsonRESTError): class ResourceNotFoundError(JsonRESTError):
def __init__(self, message): def __init__(self, message: str):
super().__init__(error_type="ResourceNotFoundException", message=message) super().__init__(error_type="ResourceNotFoundException", message=message)
@ -10,6 +10,6 @@ class InvalidNameException(JsonRESTError):
message = "1 validation error detected: Value '{}' at 'identityPoolName' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w\\s+=,.@-]+" message = "1 validation error detected: Value '{}' at 'identityPoolName' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\w\\s+=,.@-]+"
def __init__(self, name): def __init__(self, name: str):
msg = InvalidNameException.message.format(name) msg = InvalidNameException.message.format(name)
super().__init__(error_type="ValidationException", message=msg) super().__init__(error_type="ValidationException", message=msg)

View File

@ -3,14 +3,15 @@ import json
import re import re
from collections import OrderedDict from collections import OrderedDict
from typing import Any, Dict, List, Optional
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict
from .exceptions import InvalidNameException, ResourceNotFoundError from .exceptions import InvalidNameException, ResourceNotFoundError
from .utils import get_random_identity_id from .utils import get_random_identity_id
class CognitoIdentity(BaseModel): class CognitoIdentityPool(BaseModel):
def __init__(self, region, identity_pool_name, **kwargs): def __init__(self, region: str, identity_pool_name: str, **kwargs: Any):
self.identity_pool_name = identity_pool_name self.identity_pool_name = identity_pool_name
if not re.fullmatch(r"[\w\s+=,.@-]+", identity_pool_name): if not re.fullmatch(r"[\w\s+=,.@-]+", identity_pool_name):
@ -32,7 +33,7 @@ class CognitoIdentity(BaseModel):
self.tags = kwargs.get("tags") or {} self.tags = kwargs.get("tags") or {}
def to_json(self): def to_json(self) -> str:
return json.dumps( return json.dumps(
{ {
"IdentityPoolId": self.identity_pool_id, "IdentityPoolId": self.identity_pool_id,
@ -43,50 +44,37 @@ class CognitoIdentity(BaseModel):
"OpenIdConnectProviderARNs": self.open_id_connect_provider_arns, "OpenIdConnectProviderARNs": self.open_id_connect_provider_arns,
"CognitoIdentityProviders": self.cognito_identity_providers, "CognitoIdentityProviders": self.cognito_identity_providers,
"SamlProviderARNs": self.saml_provider_arns, "SamlProviderARNs": self.saml_provider_arns,
"IdentityPoolTags": self.tags,
} }
) )
class CognitoIdentityBackend(BaseBackend): class CognitoIdentityBackend(BaseBackend):
def __init__(self, region_name, account_id): def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id) super().__init__(region_name, account_id)
self.identity_pools = OrderedDict() self.identity_pools: Dict[str, CognitoIdentityPool] = OrderedDict()
self.pools_identities = {} self.pools_identities: Dict[str, Dict[str, Any]] = {}
def describe_identity_pool(self, identity_pool_id): def describe_identity_pool(self, identity_pool_id: str) -> str:
identity_pool = self.identity_pools.get(identity_pool_id, None) identity_pool = self.identity_pools.get(identity_pool_id, None)
if not identity_pool: if not identity_pool:
raise ResourceNotFoundError(identity_pool_id) raise ResourceNotFoundError(identity_pool_id)
response = json.dumps( return identity_pool.to_json()
{
"AllowUnauthenticatedIdentities": identity_pool.allow_unauthenticated_identities,
"CognitoIdentityProviders": identity_pool.cognito_identity_providers,
"DeveloperProviderName": identity_pool.developer_provider_name,
"IdentityPoolId": identity_pool.identity_pool_id,
"IdentityPoolName": identity_pool.identity_pool_name,
"IdentityPoolTags": identity_pool.tags,
"OpenIdConnectProviderARNs": identity_pool.open_id_connect_provider_arns,
"SamlProviderARNs": identity_pool.saml_provider_arns,
"SupportedLoginProviders": identity_pool.supported_login_providers,
}
)
return response
def create_identity_pool( def create_identity_pool(
self, self,
identity_pool_name, identity_pool_name: str,
allow_unauthenticated_identities, allow_unauthenticated_identities: bool,
supported_login_providers, supported_login_providers: Dict[str, str],
developer_provider_name, developer_provider_name: str,
open_id_connect_provider_arns, open_id_connect_provider_arns: List[str],
cognito_identity_providers, cognito_identity_providers: List[Dict[str, Any]],
saml_provider_arns, saml_provider_arns: List[str],
tags=None, tags: Dict[str, str],
): ) -> str:
new_identity = CognitoIdentity( new_identity = CognitoIdentityPool(
self.region_name, self.region_name,
identity_pool_name, identity_pool_name,
allow_unauthenticated_identities=allow_unauthenticated_identities, allow_unauthenticated_identities=allow_unauthenticated_identities,
@ -106,21 +94,20 @@ class CognitoIdentityBackend(BaseBackend):
} }
} }
) )
response = new_identity.to_json() return new_identity.to_json()
return response
def update_identity_pool( def update_identity_pool(
self, self,
identity_pool_id, identity_pool_id: str,
identity_pool_name, identity_pool_name: str,
allow_unauthenticated, allow_unauthenticated: Optional[bool],
login_providers, login_providers: Optional[Dict[str, str]],
provider_name, provider_name: Optional[str],
provider_arns, provider_arns: Optional[List[str]],
identity_providers, identity_providers: Optional[List[Dict[str, Any]]],
saml_providers, saml_providers: Optional[List[str]],
tags=None, tags: Optional[Dict[str, str]],
): ) -> str:
""" """
The AllowClassic-parameter has not yet been implemented The AllowClassic-parameter has not yet been implemented
""" """
@ -141,20 +128,19 @@ class CognitoIdentityBackend(BaseBackend):
if tags: if tags:
pool.tags = tags pool.tags = tags
response = pool.to_json() return pool.to_json()
return response
def get_id(self, identity_pool_id: str): def get_id(self, identity_pool_id: str) -> str:
identity_id = {"IdentityId": get_random_identity_id(self.region_name)} identity_id = {"IdentityId": get_random_identity_id(self.region_name)}
self.pools_identities[identity_pool_id]["Identities"].append(identity_id) self.pools_identities[identity_pool_id]["Identities"].append(identity_id)
return json.dumps(identity_id) return json.dumps(identity_id)
def get_credentials_for_identity(self, identity_id): def get_credentials_for_identity(self, identity_id: str) -> str:
duration = 90 duration = 90
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
expiration = now + datetime.timedelta(seconds=duration) expiration = now + datetime.timedelta(seconds=duration)
expiration_str = str(iso_8601_datetime_with_milliseconds(expiration)) expiration_str = str(iso_8601_datetime_with_milliseconds(expiration))
response = json.dumps( return json.dumps(
{ {
"Credentials": { "Credentials": {
"AccessKeyId": "TESTACCESSKEY12345", "AccessKeyId": "TESTACCESSKEY12345",
@ -165,32 +151,28 @@ class CognitoIdentityBackend(BaseBackend):
"IdentityId": identity_id, "IdentityId": identity_id,
} }
) )
return response
def get_open_id_token_for_developer_identity(self, identity_id): def get_open_id_token_for_developer_identity(self, identity_id: str) -> str:
response = json.dumps( return json.dumps(
{ {
"IdentityId": identity_id, "IdentityId": identity_id,
"Token": get_random_identity_id(self.region_name), "Token": get_random_identity_id(self.region_name),
} }
) )
return response
def get_open_id_token(self, identity_id): def get_open_id_token(self, identity_id: str) -> str:
response = json.dumps( return json.dumps(
{ {
"IdentityId": identity_id, "IdentityId": identity_id,
"Token": get_random_identity_id(self.region_name), "Token": get_random_identity_id(self.region_name),
} }
) )
return response
def list_identities(self, identity_pool_id): def list_identities(self, identity_pool_id: str) -> str:
""" """
The MaxResults-parameter has not yet been implemented The MaxResults-parameter has not yet been implemented
""" """
response = json.dumps(self.pools_identities[identity_pool_id]) return json.dumps(self.pools_identities[identity_pool_id])
return response
cognitoidentity_backends = BackendDict(CognitoIdentityBackend, "cognito-identity") cognitoidentity_backends = BackendDict(CognitoIdentityBackend, "cognito-identity")

View File

@ -1,17 +1,17 @@
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import cognitoidentity_backends from .models import cognitoidentity_backends, CognitoIdentityBackend
from .utils import get_random_identity_id from .utils import get_random_identity_id
class CognitoIdentityResponse(BaseResponse): class CognitoIdentityResponse(BaseResponse):
def __init__(self): def __init__(self) -> None:
super().__init__(service_name="cognito-identity") super().__init__(service_name="cognito-identity")
@property @property
def backend(self): def backend(self) -> CognitoIdentityBackend:
return cognitoidentity_backends[self.current_account][self.region] return cognitoidentity_backends[self.current_account][self.region]
def create_identity_pool(self): def create_identity_pool(self) -> str:
identity_pool_name = self._get_param("IdentityPoolName") identity_pool_name = self._get_param("IdentityPoolName")
allow_unauthenticated_identities = self._get_param( allow_unauthenticated_identities = self._get_param(
"AllowUnauthenticatedIdentities" "AllowUnauthenticatedIdentities"
@ -34,7 +34,7 @@ class CognitoIdentityResponse(BaseResponse):
tags=pool_tags, tags=pool_tags,
) )
def update_identity_pool(self): def update_identity_pool(self) -> str:
pool_id = self._get_param("IdentityPoolId") pool_id = self._get_param("IdentityPoolId")
pool_name = self._get_param("IdentityPoolName") pool_name = self._get_param("IdentityPoolName")
allow_unauthenticated = self._get_bool_param("AllowUnauthenticatedIdentities") allow_unauthenticated = self._get_bool_param("AllowUnauthenticatedIdentities")
@ -57,26 +57,26 @@ class CognitoIdentityResponse(BaseResponse):
tags=pool_tags, tags=pool_tags,
) )
def get_id(self): def get_id(self) -> str:
return self.backend.get_id(identity_pool_id=self._get_param("IdentityPoolId")) return self.backend.get_id(identity_pool_id=self._get_param("IdentityPoolId"))
def describe_identity_pool(self): def describe_identity_pool(self) -> str:
return self.backend.describe_identity_pool(self._get_param("IdentityPoolId")) return self.backend.describe_identity_pool(self._get_param("IdentityPoolId"))
def get_credentials_for_identity(self): def get_credentials_for_identity(self) -> str:
return self.backend.get_credentials_for_identity(self._get_param("IdentityId")) return self.backend.get_credentials_for_identity(self._get_param("IdentityId"))
def get_open_id_token_for_developer_identity(self): def get_open_id_token_for_developer_identity(self) -> str:
return self.backend.get_open_id_token_for_developer_identity( return self.backend.get_open_id_token_for_developer_identity(
self._get_param("IdentityId") or get_random_identity_id(self.region) self._get_param("IdentityId") or get_random_identity_id(self.region)
) )
def get_open_id_token(self): def get_open_id_token(self) -> str:
return self.backend.get_open_id_token( return self.backend.get_open_id_token(
self._get_param("IdentityId") or get_random_identity_id(self.region) self._get_param("IdentityId") or get_random_identity_id(self.region)
) )
def list_identities(self): def list_identities(self) -> str:
return self.backend.list_identities( return self.backend.list_identities(
self._get_param("IdentityPoolId") or get_random_identity_id(self.region) self._get_param("IdentityPoolId") or get_random_identity_id(self.region)
) )

View File

@ -1,5 +1,5 @@
from moto.moto_api._internal import mock_random from moto.moto_api._internal import mock_random
def get_random_identity_id(region): def get_random_identity_id(region: str) -> str:
return "{0}:{1}".format(region, mock_random.uuid4()) return "{0}:{1}".format(region, mock_random.uuid4())

View File

@ -1,43 +1,44 @@
from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
from typing import Optional
class ResourceNotFoundError(JsonRESTError): class ResourceNotFoundError(JsonRESTError):
def __init__(self, message): def __init__(self, message: Optional[str]):
super().__init__(error_type="ResourceNotFoundException", message=message) super().__init__(error_type="ResourceNotFoundException", message=message or "")
class UserNotFoundError(JsonRESTError): class UserNotFoundError(JsonRESTError):
def __init__(self, message): def __init__(self, message: str):
super().__init__(error_type="UserNotFoundException", message=message) super().__init__(error_type="UserNotFoundException", message=message)
class UsernameExistsException(JsonRESTError): class UsernameExistsException(JsonRESTError):
def __init__(self, message): def __init__(self, message: str):
super().__init__(error_type="UsernameExistsException", message=message) super().__init__(error_type="UsernameExistsException", message=message)
class GroupExistsException(JsonRESTError): class GroupExistsException(JsonRESTError):
def __init__(self, message): def __init__(self, message: str):
super().__init__(error_type="GroupExistsException", message=message) super().__init__(error_type="GroupExistsException", message=message)
class NotAuthorizedError(JsonRESTError): class NotAuthorizedError(JsonRESTError):
def __init__(self, message): def __init__(self, message: Optional[str]):
super().__init__(error_type="NotAuthorizedException", message=message) super().__init__(error_type="NotAuthorizedException", message=message or "")
class UserNotConfirmedException(JsonRESTError): class UserNotConfirmedException(JsonRESTError):
def __init__(self, message): def __init__(self, message: str):
super().__init__(error_type="UserNotConfirmedException", message=message) super().__init__(error_type="UserNotConfirmedException", message=message)
class ExpiredCodeException(JsonRESTError): class ExpiredCodeException(JsonRESTError):
def __init__(self, message): def __init__(self, message: str):
super().__init__(error_type="ExpiredCodeException", message=message) super().__init__(error_type="ExpiredCodeException", message=message)
class InvalidParameterException(JsonRESTError): class InvalidParameterException(JsonRESTError):
def __init__(self, msg=None): def __init__(self, msg: Optional[str] = None):
self.code = 400 self.code = 400
super().__init__( super().__init__(
"InvalidParameterException", msg or "A parameter is specified incorrectly." "InvalidParameterException", msg or "A parameter is specified incorrectly."

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,7 @@
import json import json
import os import os
import re import re
from typing import Any, Dict, Tuple
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from .models import ( from .models import (
@ -8,6 +9,7 @@ from .models import (
find_account_region_by_value, find_account_region_by_value,
RegionAgnosticBackend, RegionAgnosticBackend,
UserStatus, UserStatus,
CognitoIdpBackend,
) )
from .exceptions import InvalidParameterException from .exceptions import InvalidParameterException
@ -16,24 +18,24 @@ region_agnostic_backend = RegionAgnosticBackend()
class CognitoIdpResponse(BaseResponse): class CognitoIdpResponse(BaseResponse):
def __init__(self): def __init__(self) -> None:
super().__init__(service_name="cognito-idp") super().__init__(service_name="cognito-idp")
@property @property
def parameters(self): def parameters(self) -> Dict[str, Any]: # type: ignore[misc]
return json.loads(self.body) return json.loads(self.body)
@property @property
def backend(self): def backend(self) -> CognitoIdpBackend:
return cognitoidp_backends[self.current_account][self.region] return cognitoidp_backends[self.current_account][self.region]
# User pool # User pool
def create_user_pool(self): def create_user_pool(self) -> str:
name = self.parameters.pop("PoolName") name = self.parameters.pop("PoolName")
user_pool = self.backend.create_user_pool(name, self.parameters) user_pool = self.backend.create_user_pool(name, self.parameters)
return json.dumps({"UserPool": user_pool.to_json(extended=True)}) return json.dumps({"UserPool": user_pool.to_json(extended=True)})
def set_user_pool_mfa_config(self): def set_user_pool_mfa_config(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
sms_config = self._get_param("SmsMfaConfiguration", None) sms_config = self._get_param("SmsMfaConfiguration", None)
token_config = self._get_param("SoftwareTokenMfaConfiguration", None) token_config = self._get_param("SoftwareTokenMfaConfiguration", None)
@ -60,38 +62,40 @@ class CognitoIdpResponse(BaseResponse):
) )
return json.dumps(response) return json.dumps(response)
def get_user_pool_mfa_config(self): def get_user_pool_mfa_config(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
response = self.backend.get_user_pool_mfa_config(user_pool_id) response = self.backend.get_user_pool_mfa_config(user_pool_id)
return json.dumps(response) return json.dumps(response)
def list_user_pools(self): def list_user_pools(self) -> str:
max_results = self._get_param("MaxResults") max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken") next_token = self._get_param("NextToken")
user_pools, next_token = self.backend.list_user_pools( user_pools, next_token = self.backend.list_user_pools(
max_results=max_results, next_token=next_token max_results=max_results, next_token=next_token
) )
response = {"UserPools": [user_pool.to_json() for user_pool in user_pools]} response: Dict[str, Any] = {
"UserPools": [user_pool.to_json() for user_pool in user_pools]
}
if next_token: if next_token:
response["NextToken"] = str(next_token) response["NextToken"] = str(next_token)
return json.dumps(response) return json.dumps(response)
def describe_user_pool(self): def describe_user_pool(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
user_pool = self.backend.describe_user_pool(user_pool_id) user_pool = self.backend.describe_user_pool(user_pool_id)
return json.dumps({"UserPool": user_pool.to_json(extended=True)}) return json.dumps({"UserPool": user_pool.to_json(extended=True)})
def update_user_pool(self): def update_user_pool(self) -> None:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
self.backend.update_user_pool(user_pool_id, self.parameters) self.backend.update_user_pool(user_pool_id, self.parameters)
def delete_user_pool(self): def delete_user_pool(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
self.backend.delete_user_pool(user_pool_id) self.backend.delete_user_pool(user_pool_id)
return "" return ""
# User pool domain # User pool domain
def create_user_pool_domain(self): def create_user_pool_domain(self) -> str:
domain = self._get_param("Domain") domain = self._get_param("Domain")
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
custom_domain_config = self._get_param("CustomDomainConfig") custom_domain_config = self._get_param("CustomDomainConfig")
@ -103,21 +107,21 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps(domain_description) return json.dumps(domain_description)
return "" return ""
def describe_user_pool_domain(self): def describe_user_pool_domain(self) -> str:
domain = self._get_param("Domain") domain = self._get_param("Domain")
user_pool_domain = self.backend.describe_user_pool_domain(domain) user_pool_domain = self.backend.describe_user_pool_domain(domain)
domain_description = {} domain_description: Dict[str, Any] = {}
if user_pool_domain: if user_pool_domain:
domain_description = user_pool_domain.to_json() domain_description = user_pool_domain.to_json()
return json.dumps({"DomainDescription": domain_description}) return json.dumps({"DomainDescription": domain_description})
def delete_user_pool_domain(self): def delete_user_pool_domain(self) -> str:
domain = self._get_param("Domain") domain = self._get_param("Domain")
self.backend.delete_user_pool_domain(domain) self.backend.delete_user_pool_domain(domain)
return "" return ""
def update_user_pool_domain(self): def update_user_pool_domain(self) -> str:
domain = self._get_param("Domain") domain = self._get_param("Domain")
custom_domain_config = self._get_param("CustomDomainConfig") custom_domain_config = self._get_param("CustomDomainConfig")
user_pool_domain = self.backend.update_user_pool_domain( user_pool_domain = self.backend.update_user_pool_domain(
@ -129,7 +133,7 @@ class CognitoIdpResponse(BaseResponse):
return "" return ""
# User pool client # User pool client
def create_user_pool_client(self): def create_user_pool_client(self) -> str:
user_pool_id = self.parameters.pop("UserPoolId") user_pool_id = self.parameters.pop("UserPoolId")
generate_secret = self.parameters.pop("GenerateSecret", False) generate_secret = self.parameters.pop("GenerateSecret", False)
user_pool_client = self.backend.create_user_pool_client( user_pool_client = self.backend.create_user_pool_client(
@ -137,14 +141,14 @@ class CognitoIdpResponse(BaseResponse):
) )
return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)})
def list_user_pool_clients(self): def list_user_pool_clients(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
max_results = self._get_param("MaxResults") max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken") next_token = self._get_param("NextToken")
user_pool_clients, next_token = self.backend.list_user_pool_clients( user_pool_clients, next_token = self.backend.list_user_pool_clients(
user_pool_id, max_results=max_results, next_token=next_token user_pool_id, max_results=max_results, next_token=next_token
) )
response = { response: Dict[str, Any] = {
"UserPoolClients": [ "UserPoolClients": [
user_pool_client.to_json() for user_pool_client in user_pool_clients user_pool_client.to_json() for user_pool_client in user_pool_clients
] ]
@ -153,7 +157,7 @@ class CognitoIdpResponse(BaseResponse):
response["NextToken"] = str(next_token) response["NextToken"] = str(next_token)
return json.dumps(response) return json.dumps(response)
def describe_user_pool_client(self): def describe_user_pool_client(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
user_pool_client = self.backend.describe_user_pool_client( user_pool_client = self.backend.describe_user_pool_client(
@ -161,7 +165,7 @@ class CognitoIdpResponse(BaseResponse):
) )
return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)})
def update_user_pool_client(self): def update_user_pool_client(self) -> str:
user_pool_id = self.parameters.pop("UserPoolId") user_pool_id = self.parameters.pop("UserPoolId")
client_id = self.parameters.pop("ClientId") client_id = self.parameters.pop("ClientId")
user_pool_client = self.backend.update_user_pool_client( user_pool_client = self.backend.update_user_pool_client(
@ -169,14 +173,14 @@ class CognitoIdpResponse(BaseResponse):
) )
return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)}) return json.dumps({"UserPoolClient": user_pool_client.to_json(extended=True)})
def delete_user_pool_client(self): def delete_user_pool_client(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
self.backend.delete_user_pool_client(user_pool_id, client_id) self.backend.delete_user_pool_client(user_pool_id, client_id)
return "" return ""
# Identity provider # Identity provider
def create_identity_provider(self): def create_identity_provider(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
name = self.parameters.pop("ProviderName") name = self.parameters.pop("ProviderName")
identity_provider = self.backend.create_identity_provider( identity_provider = self.backend.create_identity_provider(
@ -186,14 +190,14 @@ class CognitoIdpResponse(BaseResponse):
{"IdentityProvider": identity_provider.to_json(extended=True)} {"IdentityProvider": identity_provider.to_json(extended=True)}
) )
def list_identity_providers(self): def list_identity_providers(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
max_results = self._get_param("MaxResults") max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken") next_token = self._get_param("NextToken")
identity_providers, next_token = self.backend.list_identity_providers( identity_providers, next_token = self.backend.list_identity_providers(
user_pool_id, max_results=max_results, next_token=next_token user_pool_id, max_results=max_results, next_token=next_token
) )
response = { response: Dict[str, Any] = {
"Providers": [ "Providers": [
identity_provider.to_json() for identity_provider in identity_providers identity_provider.to_json() for identity_provider in identity_providers
] ]
@ -202,7 +206,7 @@ class CognitoIdpResponse(BaseResponse):
response["NextToken"] = str(next_token) response["NextToken"] = str(next_token)
return json.dumps(response) return json.dumps(response)
def describe_identity_provider(self): def describe_identity_provider(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName") name = self._get_param("ProviderName")
identity_provider = self.backend.describe_identity_provider(user_pool_id, name) identity_provider = self.backend.describe_identity_provider(user_pool_id, name)
@ -210,7 +214,7 @@ class CognitoIdpResponse(BaseResponse):
{"IdentityProvider": identity_provider.to_json(extended=True)} {"IdentityProvider": identity_provider.to_json(extended=True)}
) )
def update_identity_provider(self): def update_identity_provider(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName") name = self._get_param("ProviderName")
identity_provider = self.backend.update_identity_provider( identity_provider = self.backend.update_identity_provider(
@ -220,14 +224,14 @@ class CognitoIdpResponse(BaseResponse):
{"IdentityProvider": identity_provider.to_json(extended=True)} {"IdentityProvider": identity_provider.to_json(extended=True)}
) )
def delete_identity_provider(self): def delete_identity_provider(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
name = self._get_param("ProviderName") name = self._get_param("ProviderName")
self.backend.delete_identity_provider(user_pool_id, name) self.backend.delete_identity_provider(user_pool_id, name)
return "" return ""
# Group # Group
def create_group(self): def create_group(self) -> str:
group_name = self._get_param("GroupName") group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
description = self._get_param("Description") description = self._get_param("Description")
@ -240,13 +244,13 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps({"Group": group.to_json()}) return json.dumps({"Group": group.to_json()})
def get_group(self): def get_group(self) -> str:
group_name = self._get_param("GroupName") group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
group = self.backend.get_group(user_pool_id, group_name) group = self.backend.get_group(user_pool_id, group_name)
return json.dumps({"Group": group.to_json()}) return json.dumps({"Group": group.to_json()})
def list_groups(self): def list_groups(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
limit = self._get_param("Limit") limit = self._get_param("Limit")
token = self._get_param("NextToken") token = self._get_param("NextToken")
@ -258,13 +262,13 @@ class CognitoIdpResponse(BaseResponse):
response["NextToken"] = token response["NextToken"] = token
return json.dumps(response) return json.dumps(response)
def delete_group(self): def delete_group(self) -> str:
group_name = self._get_param("GroupName") group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
self.backend.delete_group(user_pool_id, group_name) self.backend.delete_group(user_pool_id, group_name)
return "" return ""
def update_group(self): def update_group(self) -> str:
group_name = self._get_param("GroupName") group_name = self._get_param("GroupName")
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
description = self._get_param("Description") description = self._get_param("Description")
@ -277,7 +281,7 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps({"Group": group.to_json()}) return json.dumps({"Group": group.to_json()})
def admin_add_user_to_group(self): def admin_add_user_to_group(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
group_name = self._get_param("GroupName") group_name = self._get_param("GroupName")
@ -286,7 +290,7 @@ class CognitoIdpResponse(BaseResponse):
return "" return ""
def list_users_in_group(self): def list_users_in_group(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
group_name = self._get_param("GroupName") group_name = self._get_param("GroupName")
limit = self._get_param("Limit") limit = self._get_param("Limit")
@ -299,13 +303,13 @@ class CognitoIdpResponse(BaseResponse):
response["NextToken"] = token response["NextToken"] = token
return json.dumps(response) return json.dumps(response)
def admin_list_groups_for_user(self): def admin_list_groups_for_user(self) -> str:
username = self._get_param("Username") username = self._get_param("Username")
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
groups = self.backend.admin_list_groups_for_user(user_pool_id, username) groups = self.backend.admin_list_groups_for_user(user_pool_id, username)
return json.dumps({"Groups": [group.to_json() for group in groups]}) return json.dumps({"Groups": [group.to_json() for group in groups]})
def admin_remove_user_from_group(self): def admin_remove_user_from_group(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
group_name = self._get_param("GroupName") group_name = self._get_param("GroupName")
@ -314,14 +318,14 @@ class CognitoIdpResponse(BaseResponse):
return "" return ""
def admin_reset_user_password(self): def admin_reset_user_password(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
self.backend.admin_reset_user_password(user_pool_id, username) self.backend.admin_reset_user_password(user_pool_id, username)
return "" return ""
# User # User
def admin_create_user(self): def admin_create_user(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
message_action = self._get_param("MessageAction") message_action = self._get_param("MessageAction")
@ -336,23 +340,23 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps({"User": user.to_json(extended=True)}) return json.dumps({"User": user.to_json(extended=True)})
def admin_confirm_sign_up(self): def admin_confirm_sign_up(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
return self.backend.admin_confirm_sign_up(user_pool_id, username) return self.backend.admin_confirm_sign_up(user_pool_id, username)
def admin_get_user(self): def admin_get_user(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
user = self.backend.admin_get_user(user_pool_id, username) user = self.backend.admin_get_user(user_pool_id, username)
return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes"))
def get_user(self): def get_user(self) -> str:
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
user = region_agnostic_backend.get_user(access_token=access_token) user = region_agnostic_backend.get_user(access_token=access_token)
return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes"))
def list_users(self): def list_users(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
limit = self._get_param("Limit") limit = self._get_param("Limit")
token = self._get_param("PaginationToken") token = self._get_param("PaginationToken")
@ -362,12 +366,15 @@ class CognitoIdpResponse(BaseResponse):
user_pool_id, limit=limit, pagination_token=token user_pool_id, limit=limit, pagination_token=token
) )
if filt: if filt:
inherent_attributes = { inherent_attributes: Dict[str, Any] = {
"cognito:user_status": lambda u: u.status, "cognito:user_status": lambda u: u.status,
"status": lambda u: "Enabled" if u.enabled else "Disabled", "status": lambda u: "Enabled" if u.enabled else "Disabled",
"username": lambda u: u.username, "username": lambda u: u.username,
} }
comparisons = {"=": lambda x, y: x == y, "^=": lambda x, y: x.startswith(y)} comparisons: Dict[str, Any] = {
"=": lambda x, y: x == y,
"^=": lambda x, y: x.startswith(y),
}
allowed_attributes = [ allowed_attributes = [
"username", "username",
"email", "email",
@ -402,7 +409,7 @@ class CognitoIdpResponse(BaseResponse):
and compare(inherent_attributes[name](user), value) and compare(inherent_attributes[name](user), value)
) )
] ]
response = { response: Dict[str, Any] = {
"Users": [ "Users": [
user.to_json(extended=True, attributes_to_get=attributes_to_get) user.to_json(extended=True, attributes_to_get=attributes_to_get)
for user in users for user in users
@ -412,25 +419,25 @@ class CognitoIdpResponse(BaseResponse):
response["PaginationToken"] = str(token) response["PaginationToken"] = str(token)
return json.dumps(response) return json.dumps(response)
def admin_disable_user(self): def admin_disable_user(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
self.backend.admin_disable_user(user_pool_id, username) self.backend.admin_disable_user(user_pool_id, username)
return "" return ""
def admin_enable_user(self): def admin_enable_user(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
self.backend.admin_enable_user(user_pool_id, username) self.backend.admin_enable_user(user_pool_id, username)
return "" return ""
def admin_delete_user(self): def admin_delete_user(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
self.backend.admin_delete_user(user_pool_id, username) self.backend.admin_delete_user(user_pool_id, username)
return "" return ""
def admin_initiate_auth(self): def admin_initiate_auth(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
auth_flow = self._get_param("AuthFlow") auth_flow = self._get_param("AuthFlow")
@ -442,7 +449,7 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps(auth_result) return json.dumps(auth_result)
def respond_to_auth_challenge(self): def respond_to_auth_challenge(self) -> str:
session = self._get_param("Session") session = self._get_param("Session")
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
challenge_name = self._get_param("ChallengeName") challenge_name = self._get_param("ChallengeName")
@ -453,7 +460,7 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps(auth_result) return json.dumps(auth_result)
def forgot_password(self): def forgot_password(self) -> str:
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
username = self._get_param("Username") username = self._get_param("Username")
account, region = find_account_region_by_value("client_id", client_id) account, region = find_account_region_by_value("client_id", client_id)
@ -469,7 +476,7 @@ class CognitoIdpResponse(BaseResponse):
# on localhost (doesn't get a region in the host header), it doesn't know what # on localhost (doesn't get a region in the host header), it doesn't know what
# region's backend should handle the traffic, and we use `find_region_by_value` to # region's backend should handle the traffic, and we use `find_region_by_value` to
# solve that problem. # solve that problem.
def confirm_forgot_password(self): def confirm_forgot_password(self) -> str:
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
username = self._get_param("Username") username = self._get_param("Username")
password = self._get_param("Password") password = self._get_param("Password")
@ -481,7 +488,7 @@ class CognitoIdpResponse(BaseResponse):
return "" return ""
# Ditto the comment on confirm_forgot_password. # Ditto the comment on confirm_forgot_password.
def change_password(self): def change_password(self) -> str:
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
previous_password = self._get_param("PreviousPassword") previous_password = self._get_param("PreviousPassword")
proposed_password = self._get_param("ProposedPassword") proposed_password = self._get_param("ProposedPassword")
@ -491,33 +498,33 @@ class CognitoIdpResponse(BaseResponse):
) )
return "" return ""
def admin_update_user_attributes(self): def admin_update_user_attributes(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
attributes = self._get_param("UserAttributes") attributes = self._get_param("UserAttributes")
self.backend.admin_update_user_attributes(user_pool_id, username, attributes) self.backend.admin_update_user_attributes(user_pool_id, username, attributes)
return "" return ""
def admin_delete_user_attributes(self): def admin_delete_user_attributes(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
attributes = self._get_param("UserAttributeNames") attributes = self._get_param("UserAttributeNames")
self.backend.admin_delete_user_attributes(user_pool_id, username, attributes) self.backend.admin_delete_user_attributes(user_pool_id, username, attributes)
return "" return ""
def admin_user_global_sign_out(self): def admin_user_global_sign_out(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
self.backend.admin_user_global_sign_out(user_pool_id, username) self.backend.admin_user_global_sign_out(user_pool_id, username)
return "" return ""
def global_sign_out(self): def global_sign_out(self) -> str:
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
self.backend.global_sign_out(access_token) self.backend.global_sign_out(access_token)
return "" return ""
# Resource Server # Resource Server
def create_resource_server(self): def create_resource_server(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
identifier = self._get_param("Identifier") identifier = self._get_param("Identifier")
name = self._get_param("Name") name = self._get_param("Name")
@ -527,7 +534,7 @@ class CognitoIdpResponse(BaseResponse):
) )
return json.dumps({"ResourceServer": resource_server.to_json()}) return json.dumps({"ResourceServer": resource_server.to_json()})
def sign_up(self): def sign_up(self) -> str:
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
username = self._get_param("Username") username = self._get_param("Username")
password = self._get_param("Password") password = self._get_param("Password")
@ -544,13 +551,13 @@ class CognitoIdpResponse(BaseResponse):
} }
) )
def confirm_sign_up(self): def confirm_sign_up(self) -> str:
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
username = self._get_param("Username") username = self._get_param("Username")
region_agnostic_backend.confirm_sign_up(client_id=client_id, username=username) region_agnostic_backend.confirm_sign_up(client_id=client_id, username=username)
return "" return ""
def initiate_auth(self): def initiate_auth(self) -> str:
client_id = self._get_param("ClientId") client_id = self._get_param("ClientId")
auth_flow = self._get_param("AuthFlow") auth_flow = self._get_param("AuthFlow")
auth_parameters = self._get_param("AuthParameters") auth_parameters = self._get_param("AuthParameters")
@ -561,17 +568,17 @@ class CognitoIdpResponse(BaseResponse):
return json.dumps(auth_result) return json.dumps(auth_result)
def associate_software_token(self): def associate_software_token(self) -> str:
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
result = self.backend.associate_software_token(access_token) result = self.backend.associate_software_token(access_token)
return json.dumps(result) return json.dumps(result)
def verify_software_token(self): def verify_software_token(self) -> str:
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
result = self.backend.verify_software_token(access_token) result = self.backend.verify_software_token(access_token)
return json.dumps(result) return json.dumps(result)
def set_user_mfa_preference(self): def set_user_mfa_preference(self) -> str:
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings") software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings")
sms_mfa_settings = self._get_param("SMSMfaSettings") sms_mfa_settings = self._get_param("SMSMfaSettings")
@ -580,7 +587,7 @@ class CognitoIdpResponse(BaseResponse):
) )
return "" return ""
def admin_set_user_mfa_preference(self): def admin_set_user_mfa_preference(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings") software_token_mfa_settings = self._get_param("SoftwareTokenMfaSettings")
@ -590,7 +597,7 @@ class CognitoIdpResponse(BaseResponse):
) )
return "" return ""
def admin_set_user_password(self): def admin_set_user_password(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
username = self._get_param("Username") username = self._get_param("Username")
password = self._get_param("Password") password = self._get_param("Password")
@ -600,13 +607,13 @@ class CognitoIdpResponse(BaseResponse):
) )
return "" return ""
def add_custom_attributes(self): def add_custom_attributes(self) -> str:
user_pool_id = self._get_param("UserPoolId") user_pool_id = self._get_param("UserPoolId")
custom_attributes = self._get_param("CustomAttributes") custom_attributes = self._get_param("CustomAttributes")
self.backend.add_custom_attributes(user_pool_id, custom_attributes) self.backend.add_custom_attributes(user_pool_id, custom_attributes)
return "" return ""
def update_user_attributes(self): def update_user_attributes(self) -> str:
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
attributes = self._get_param("UserAttributes") attributes = self._get_param("UserAttributes")
self.backend.update_user_attributes(access_token, attributes) self.backend.update_user_attributes(access_token, attributes)
@ -614,13 +621,16 @@ class CognitoIdpResponse(BaseResponse):
class CognitoIdpJsonWebKeyResponse(BaseResponse): class CognitoIdpJsonWebKeyResponse(BaseResponse):
def __init__(self): def __init__(self) -> None:
with open( with open(
os.path.join(os.path.dirname(__file__), "resources/jwks-public.json") os.path.join(os.path.dirname(__file__), "resources/jwks-public.json")
) as f: ) as f:
self.json_web_key = f.read() self.json_web_key = f.read()
def serve_json_web_key( def serve_json_web_key(
self, request, full_url, headers self,
): # pylint: disable=unused-argument request: Any, # pylint: disable=unused-argument
full_url: str, # pylint: disable=unused-argument
headers: Any, # pylint: disable=unused-argument
) -> Tuple[int, Dict[str, str], str]:
return 200, {"Content-Type": "application/json"}, self.json_web_key return 200, {"Content-Type": "application/json"}, self.json_web_key

View File

@ -4,6 +4,7 @@ import hmac
import base64 import base64
import re import re
from moto.moto_api._internal import mock_random as random from moto.moto_api._internal import mock_random as random
from typing import Any, Dict, List, Optional
FORMATS = { FORMATS = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
@ -51,13 +52,18 @@ PAGINATION_MODEL = {
} }
def create_id(): def create_id() -> str:
size = 26 size = 26
chars = list(range(10)) + list(string.ascii_lowercase) chars = list(range(10)) + list(string.ascii_lowercase)
return "".join(str(random.choice(chars)) for x in range(size)) return "".join(str(random.choice(chars)) for x in range(size))
def check_secret_hash(app_client_secret, app_client_id, username, secret_hash): def check_secret_hash(
app_client_secret: str,
app_client_id: str,
username: str,
secret_hash: Optional[str],
) -> bool:
key = bytes(str(app_client_secret).encode("latin-1")) key = bytes(str(app_client_secret).encode("latin-1"))
msg = bytes(str(username + app_client_id).encode("latin-1")) msg = bytes(str(username + app_client_id).encode("latin-1"))
new_digest = hmac.new(key, msg, hashlib.sha256).digest() new_digest = hmac.new(key, msg, hashlib.sha256).digest()
@ -65,36 +71,36 @@ def check_secret_hash(app_client_secret, app_client_id, username, secret_hash):
return SECRET_HASH == secret_hash return SECRET_HASH == secret_hash
def validate_username_format(username, _format="email"): def validate_username_format(username: str, _format: str = "email") -> bool:
# if the value of the `_format` param other than `email` or `phone_number`, # if the value of the `_format` param other than `email` or `phone_number`,
# the default value for the regex will match nothing and the # the default value for the regex will match nothing and the
# method will return None # method will return None
return re.fullmatch(FORMATS.get(_format, r"a^"), username) return re.fullmatch(FORMATS.get(_format, r"a^"), username) is not None
def flatten_attrs(attrs): def flatten_attrs(attrs: List[Dict[str, Any]]) -> Dict[str, Any]:
return {attr["Name"]: attr["Value"] for attr in attrs} return {attr["Name"]: attr["Value"] for attr in attrs}
def expand_attrs(attrs): def expand_attrs(attrs: Dict[str, Any]) -> List[Dict[str, Any]]:
return [{"Name": k, "Value": v} for k, v in attrs.items()] return [{"Name": k, "Value": v} for k, v in attrs.items()]
ID_HASH_STRATEGY = "HASH" ID_HASH_STRATEGY = "HASH"
def generate_id(strategy, *args): def generate_id(strategy: Optional[str], *args: Any) -> str:
if strategy == ID_HASH_STRATEGY: if strategy == ID_HASH_STRATEGY:
return _generate_id_hash(args) return _generate_id_hash(args)
else: else:
return _generate_id_uuid() return _generate_id_uuid()
def _generate_id_uuid(): def _generate_id_uuid() -> str:
return random.uuid4().hex return random.uuid4().hex
def _generate_id_hash(args): def _generate_id_hash(args: Any) -> str:
hasher = hashlib.sha256() hasher = hashlib.sha256()
for arg in args: for arg in args:

View File

@ -3,6 +3,7 @@ import os
import pathlib import pathlib
from functools import lru_cache from functools import lru_cache
from typing import Optional
TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true" TEST_SERVER_MODE = os.environ.get("TEST_SERVER_MODE", "0").lower() == "true"
@ -130,5 +131,5 @@ def get_docker_host() -> str:
return "http://host.docker.internal" return "http://host.docker.internal"
def get_cognito_idp_user_pool_id_strategy(): def get_cognito_idp_user_pool_id_strategy() -> Optional[str]:
return os.environ.get("MOTO_COGNITO_IDP_USER_POOL_ID_STRATEGY") return os.environ.get("MOTO_COGNITO_IDP_USER_POOL_ID_STRATEGY")

View File

@ -2,13 +2,16 @@ import inspect
from copy import deepcopy from copy import deepcopy
from functools import wraps from functools import wraps
from typing import Dict, Any, Callable
from botocore.paginate import TokenDecoder, TokenEncoder from botocore.paginate import TokenDecoder, TokenEncoder
from moto.core.exceptions import InvalidToken from moto.core.exceptions import InvalidToken
def paginate(pagination_model, original_function=None): def paginate(
pagination_model: Dict[str, Any], original_function: Callable = None
) -> Callable:
def pagination_decorator(func): def pagination_decorator(func):
@wraps(func) @wraps(func)
def pagination_wrapper(*args, **kwargs): def pagination_wrapper(*args, **kwargs):

View File

@ -48,7 +48,7 @@ def filter_resources(resources, filters, attr_pairs):
return result return result
def md5_hash(data=None): def md5_hash(data: Any = None) -> Any:
""" """
MD5-hashing for non-security usecases. MD5-hashing for non-security usecases.
Required for Moto to work in FIPS-enabled systems Required for Moto to work in FIPS-enabled systems

View File

@ -18,7 +18,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy] [mypy]
files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit files= moto/a*,moto/b*,moto/ce,moto/cloudformation,moto/cloudfront,moto/cloudtrail,moto/codebuild,moto/cloudwatch,moto/codepipeline,moto/codecommit,moto/cognito*
show_column_numbers=True show_column_numbers=True
show_error_codes = True show_error_codes = True
disable_error_code=abstract disable_error_code=abstract