import datetime import json import re from collections import OrderedDict from typing import Any, Dict, List, Optional from moto.core import BaseBackend, BackendDict, BaseModel from moto.core.utils import iso_8601_datetime_with_milliseconds from .exceptions import InvalidNameException, ResourceNotFoundError from .utils import get_random_identity_id class CognitoIdentityPool(BaseModel): def __init__(self, region: str, identity_pool_name: str, **kwargs: Any): self.identity_pool_name = identity_pool_name if not re.fullmatch(r"[\w\s+=,.@-]+", identity_pool_name): raise InvalidNameException(identity_pool_name) self.allow_unauthenticated_identities = kwargs.get( "allow_unauthenticated_identities", "" ) self.supported_login_providers = kwargs.get("supported_login_providers", {}) self.developer_provider_name = kwargs.get("developer_provider_name", "") self.open_id_connect_provider_arns = kwargs.get( "open_id_connect_provider_arns", [] ) self.cognito_identity_providers = kwargs.get("cognito_identity_providers", []) self.saml_provider_arns = kwargs.get("saml_provider_arns", []) self.identity_pool_id = get_random_identity_id(region) self.creation_time = datetime.datetime.utcnow() self.tags = kwargs.get("tags") or {} def to_json(self) -> str: return json.dumps( { "IdentityPoolId": self.identity_pool_id, "IdentityPoolName": self.identity_pool_name, "AllowUnauthenticatedIdentities": self.allow_unauthenticated_identities, "SupportedLoginProviders": self.supported_login_providers, "DeveloperProviderName": self.developer_provider_name, "OpenIdConnectProviderARNs": self.open_id_connect_provider_arns, "CognitoIdentityProviders": self.cognito_identity_providers, "SamlProviderARNs": self.saml_provider_arns, "IdentityPoolTags": self.tags, } ) class CognitoIdentityBackend(BaseBackend): def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.identity_pools: Dict[str, CognitoIdentityPool] = OrderedDict() self.pools_identities: Dict[str, Dict[str, Any]] = {} def describe_identity_pool(self, identity_pool_id: str) -> str: identity_pool = self.identity_pools.get(identity_pool_id, None) if not identity_pool: raise ResourceNotFoundError(identity_pool_id) return identity_pool.to_json() def create_identity_pool( self, identity_pool_name: str, allow_unauthenticated_identities: bool, supported_login_providers: Dict[str, str], developer_provider_name: str, open_id_connect_provider_arns: List[str], cognito_identity_providers: List[Dict[str, Any]], saml_provider_arns: List[str], tags: Dict[str, str], ) -> str: new_identity = CognitoIdentityPool( self.region_name, identity_pool_name, allow_unauthenticated_identities=allow_unauthenticated_identities, supported_login_providers=supported_login_providers, developer_provider_name=developer_provider_name, open_id_connect_provider_arns=open_id_connect_provider_arns, cognito_identity_providers=cognito_identity_providers, saml_provider_arns=saml_provider_arns, tags=tags, ) self.identity_pools[new_identity.identity_pool_id] = new_identity self.pools_identities.update( { new_identity.identity_pool_id: { "IdentityPoolId": new_identity.identity_pool_id, "Identities": [], } } ) return new_identity.to_json() def update_identity_pool( self, identity_pool_id: str, identity_pool_name: str, allow_unauthenticated: Optional[bool], login_providers: Optional[Dict[str, str]], provider_name: Optional[str], provider_arns: Optional[List[str]], identity_providers: Optional[List[Dict[str, Any]]], saml_providers: Optional[List[str]], tags: Optional[Dict[str, str]], ) -> str: """ The AllowClassic-parameter has not yet been implemented """ pool = self.identity_pools[identity_pool_id] pool.identity_pool_name = pool.identity_pool_name or identity_pool_name if allow_unauthenticated is not None: pool.allow_unauthenticated_identities = allow_unauthenticated if login_providers is not None: pool.supported_login_providers = login_providers if provider_name: pool.developer_provider_name = provider_name if provider_arns is not None: pool.open_id_connect_provider_arns = provider_arns if identity_providers is not None: pool.cognito_identity_providers = identity_providers if saml_providers is not None: pool.saml_provider_arns = saml_providers if tags: pool.tags = tags return pool.to_json() def get_id(self, identity_pool_id: str) -> str: identity_id = {"IdentityId": get_random_identity_id(self.region_name)} self.pools_identities[identity_pool_id]["Identities"].append(identity_id) return json.dumps(identity_id) def get_credentials_for_identity(self, identity_id: str) -> str: duration = 90 now = datetime.datetime.utcnow() expiration = now + datetime.timedelta(seconds=duration) expiration_str = str(iso_8601_datetime_with_milliseconds(expiration)) return json.dumps( { "Credentials": { "AccessKeyId": "TESTACCESSKEY12345", "Expiration": expiration_str, "SecretKey": "ABCSECRETKEY", "SessionToken": "ABC12345", }, "IdentityId": identity_id, } ) def get_open_id_token_for_developer_identity(self, identity_id: str) -> str: return json.dumps( { "IdentityId": identity_id, "Token": get_random_identity_id(self.region_name), } ) def get_open_id_token(self, identity_id: str) -> str: return json.dumps( { "IdentityId": identity_id, "Token": get_random_identity_id(self.region_name), } ) def list_identities(self, identity_pool_id: str) -> str: """ The MaxResults-parameter has not yet been implemented """ return json.dumps(self.pools_identities[identity_pool_id]) cognitoidentity_backends = BackendDict(CognitoIdentityBackend, "cognito-identity")