Merge pull request #1548 from brcoding/master

Created Cognito-Identity with partial coverage and unit tests
This commit is contained in:
Steve Pulec 2018-04-12 18:55:13 -04:00 committed by GitHub
commit 963e28ecca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 280 additions and 0 deletions

View File

@ -70,6 +70,8 @@ It gets even better! Moto isn't just for Python code and it isn't just for S3. L
|------------------------------------------------------------------------------|
| CloudwatchEvents | @mock_events | all endpoints done |
|------------------------------------------------------------------------------|
| Cognito Identity | @mock_cognitoidentity| basic endpoints done |
|------------------------------------------------------------------------------|
| Data Pipeline | @mock_datapipeline| basic endpoints done |
|------------------------------------------------------------------------------|
| DynamoDB | @mock_dynamodb | core endpoints done |

View File

@ -11,6 +11,7 @@ from .autoscaling import mock_autoscaling, mock_autoscaling_deprecated # flake8
from .awslambda import mock_lambda, mock_lambda_deprecated # flake8: noqa
from .cloudformation import mock_cloudformation, mock_cloudformation_deprecated # flake8: noqa
from .cloudwatch import mock_cloudwatch, mock_cloudwatch_deprecated # flake8: noqa
from .cognitoidentity import mock_cognitoidentity, mock_cognitoidentity_deprecated # flake8: noqa
from .datapipeline import mock_datapipeline, mock_datapipeline_deprecated # flake8: noqa
from .dynamodb import mock_dynamodb, mock_dynamodb_deprecated # flake8: noqa
from .dynamodb2 import mock_dynamodb2, mock_dynamodb2_deprecated # flake8: noqa

View File

@ -6,6 +6,7 @@ from moto.autoscaling import autoscaling_backends
from moto.awslambda import lambda_backends
from moto.cloudformation import cloudformation_backends
from moto.cloudwatch import cloudwatch_backends
from moto.cognitoidentity import cognitoidentity_backends
from moto.core import moto_api_backends
from moto.datapipeline import datapipeline_backends
from moto.dynamodb import dynamodb_backends
@ -49,6 +50,7 @@ BACKENDS = {
'batch': batch_backends,
'cloudformation': cloudformation_backends,
'cloudwatch': cloudwatch_backends,
'cognito-identity': cognitoidentity_backends,
'datapipeline': datapipeline_backends,
'dynamodb': dynamodb_backends,
'dynamodb2': dynamodb_backends2,

View File

@ -10,6 +10,7 @@ from moto.autoscaling import models as autoscaling_models
from moto.awslambda import models as lambda_models
from moto.batch import models as batch_models
from moto.cloudwatch import models as cloudwatch_models
from moto.cognitoidentity import models as cognitoidentity_models
from moto.datapipeline import models as datapipeline_models
from moto.dynamodb import models as dynamodb_models
from moto.ec2 import models as ec2_models
@ -65,6 +66,7 @@ MODEL_MAP = {
"AWS::ElasticLoadBalancingV2::LoadBalancer": elbv2_models.FakeLoadBalancer,
"AWS::ElasticLoadBalancingV2::TargetGroup": elbv2_models.FakeTargetGroup,
"AWS::ElasticLoadBalancingV2::Listener": elbv2_models.FakeListener,
"AWS::Cognito::IdentityPool": cognitoidentity_models.CognitoIdentity,
"AWS::DataPipeline::Pipeline": datapipeline_models.Pipeline,
"AWS::IAM::InstanceProfile": iam_models.InstanceProfile,
"AWS::IAM::Role": iam_models.Role,

View File

@ -0,0 +1,7 @@
from __future__ import unicode_literals
from .models import cognitoidentity_backends
from ..core.models import base_decorator, deprecated_base_decorator
cognitoidentity_backend = cognitoidentity_backends['us-east-1']
mock_cognitoidentity = base_decorator(cognitoidentity_backends)
mock_cognitoidentity_deprecated = deprecated_base_decorator(cognitoidentity_backends)

View File

@ -0,0 +1,101 @@
from __future__ import unicode_literals
import datetime
import json
import boto.cognito.identity
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from .utils import get_random_identity_id
class CognitoIdentity(BaseModel):
def __init__(self, region, identity_pool_name, **kwargs):
self.identity_pool_name = identity_pool_name
self.allow_unauthenticated_identities = kwargs.get('allow_unauthenticated_identities', '')
self.supported_login_providers = kwargs.get('supported_login_providers', {})
self.developer_provider_name = kwargs.get('developer_provider_name', '')
self.open_id_connect_provider_arns = kwargs.get('open_id_connect_provider_arns', [])
self.cognito_identity_providers = kwargs.get('cognito_identity_providers', [])
self.saml_provider_arns = kwargs.get('saml_provider_arns', [])
self.identity_pool_id = get_random_identity_id(region)
self.creation_time = datetime.datetime.utcnow()
class CognitoIdentityBackend(BaseBackend):
def __init__(self, region):
super(CognitoIdentityBackend, self).__init__()
self.region = region
self.identity_pools = OrderedDict()
def reset(self):
region = self.region
self.__dict__ = {}
self.__init__(region)
def create_identity_pool(self, identity_pool_name, allow_unauthenticated_identities,
supported_login_providers, developer_provider_name, open_id_connect_provider_arns,
cognito_identity_providers, saml_provider_arns):
new_identity = CognitoIdentity(self.region, identity_pool_name,
allow_unauthenticated_identities=allow_unauthenticated_identities,
supported_login_providers=supported_login_providers,
developer_provider_name=developer_provider_name,
open_id_connect_provider_arns=open_id_connect_provider_arns,
cognito_identity_providers=cognito_identity_providers,
saml_provider_arns=saml_provider_arns)
self.identity_pools[new_identity.identity_pool_id] = new_identity
response = json.dumps({
'IdentityPoolId': new_identity.identity_pool_id,
'IdentityPoolName': new_identity.identity_pool_name,
'AllowUnauthenticatedIdentities': new_identity.allow_unauthenticated_identities,
'SupportedLoginProviders': new_identity.supported_login_providers,
'DeveloperProviderName': new_identity.developer_provider_name,
'OpenIdConnectProviderARNs': new_identity.open_id_connect_provider_arns,
'CognitoIdentityProviders': new_identity.cognito_identity_providers,
'SamlProviderARNs': new_identity.saml_provider_arns
})
return response
def get_id(self):
identity_id = {'IdentityId': get_random_identity_id(self.region)}
return json.dumps(identity_id)
def get_credentials_for_identity(self, identity_id):
duration = 90
now = datetime.datetime.utcnow()
expiration = now + datetime.timedelta(seconds=duration)
expiration_str = str(iso_8601_datetime_with_milliseconds(expiration))
response = json.dumps(
{
"Credentials":
{
"AccessKeyId": "TESTACCESSKEY12345",
"Expiration": expiration_str,
"SecretKey": "ABCSECRETKEY",
"SessionToken": "ABC12345"
},
"IdentityId": identity_id
})
return response
def get_open_id_token_for_developer_identity(self, identity_id):
response = json.dumps(
{
"IdentityId": identity_id,
"Token": get_random_identity_id(self.region)
})
return response
cognitoidentity_backends = {}
for region in boto.cognito.identity.regions():
cognitoidentity_backends[region.name] = CognitoIdentityBackend(region.name)

View File

@ -0,0 +1,34 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import cognitoidentity_backends
class CognitoIdentityResponse(BaseResponse):
def create_identity_pool(self):
identity_pool_name = self._get_param('IdentityPoolName')
allow_unauthenticated_identities = self._get_param('AllowUnauthenticatedIdentities')
supported_login_providers = self._get_param('SupportedLoginProviders')
developer_provider_name = self._get_param('DeveloperProviderName')
open_id_connect_provider_arns = self._get_param('OpenIdConnectProviderARNs')
cognito_identity_providers = self._get_param('CognitoIdentityProviders')
saml_provider_arns = self._get_param('SamlProviderARNs')
return cognitoidentity_backends[self.region].create_identity_pool(
identity_pool_name=identity_pool_name,
allow_unauthenticated_identities=allow_unauthenticated_identities,
supported_login_providers=supported_login_providers,
developer_provider_name=developer_provider_name,
open_id_connect_provider_arns=open_id_connect_provider_arns,
cognito_identity_providers=cognito_identity_providers,
saml_provider_arns=saml_provider_arns)
def get_id(self):
return cognitoidentity_backends[self.region].get_id()
def get_credentials_for_identity(self):
return cognitoidentity_backends[self.region].get_credentials_for_identity(self._get_param('IdentityId'))
def get_open_id_token_for_developer_identity(self):
return cognitoidentity_backends[self.region].get_open_id_token_for_developer_identity(self._get_param('IdentityId'))

View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import CognitoIdentityResponse
url_bases = [
"https?://cognito-identity.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': CognitoIdentityResponse.dispatch,
}

View File

@ -0,0 +1,5 @@
from moto.core.utils import get_random_hex
def get_random_identity_id(region):
return "{0}:{0}".format(region, get_random_hex(length=19))

View File

@ -0,0 +1,71 @@
from __future__ import unicode_literals
import boto3
from moto import mock_cognitoidentity
import sure # noqa
from moto.cognitoidentity.utils import get_random_identity_id
@mock_cognitoidentity
def test_create_identity_pool():
conn = boto3.client('cognito-identity', 'us-west-2')
result = conn.create_identity_pool(IdentityPoolName='TestPool',
AllowUnauthenticatedIdentities=False,
SupportedLoginProviders={'graph.facebook.com': '123456789012345'},
DeveloperProviderName='devname',
OpenIdConnectProviderARNs=['arn:aws:rds:eu-west-2:123456789012:db:mysql-db'],
CognitoIdentityProviders=[
{
'ProviderName': 'testprovider',
'ClientId': 'CLIENT12345',
'ServerSideTokenCheck': True
},
],
SamlProviderARNs=['arn:aws:rds:eu-west-2:123456789012:db:mysql-db'])
assert result['IdentityPoolId'] != ''
# testing a helper function
def test_get_random_identity_id():
assert len(get_random_identity_id('us-west-2')) > 0
@mock_cognitoidentity
def test_get_id():
# These two do NOT work in server mode. They just don't return the data from the model.
conn = boto3.client('cognito-identity', 'us-west-2')
result = conn.get_id(AccountId='someaccount',
IdentityPoolId='us-west-2:12345',
Logins={
'someurl': '12345'
})
print(result)
assert result.get('IdentityId', "").startswith('us-west-2') or result.get('ResponseMetadata').get('HTTPStatusCode') == 200
@mock_cognitoidentity
def test_get_credentials_for_identity():
# These two do NOT work in server mode. They just don't return the data from the model.
conn = boto3.client('cognito-identity', 'us-west-2')
result = conn.get_credentials_for_identity(IdentityId='12345')
assert result.get('Expiration', 0) > 0 or result.get('ResponseMetadata').get('HTTPStatusCode') == 200
assert result.get('IdentityId') == '12345' or result.get('ResponseMetadata').get('HTTPStatusCode') == 200
@mock_cognitoidentity
def test_get_open_id_token_for_developer_identity():
conn = boto3.client('cognito-identity', 'us-west-2')
result = conn.get_open_id_token_for_developer_identity(
IdentityPoolId='us-west-2:12345',
IdentityId='12345',
Logins={
'someurl': '12345'
},
TokenDuration=123
)
assert len(result['Token'])
assert result['IdentityId'] == '12345'

View File

@ -0,0 +1,45 @@
from __future__ import unicode_literals
import json
import sure # noqa
import moto.server as server
from moto import mock_cognitoidentity
'''
Test the different server responses
'''
@mock_cognitoidentity
def test_create_identity_pool():
backend = server.create_backend_app("cognito-identity")
test_client = backend.test_client()
res = test_client.post('/',
data={"IdentityPoolName": "test", "AllowUnauthenticatedIdentities": True},
headers={
"X-Amz-Target": "com.amazonaws.cognito.identity.model.AWSCognitoIdentityService.CreateIdentityPool"},
)
json_data = json.loads(res.data.decode("utf-8"))
assert json_data['IdentityPoolName'] == "test"
@mock_cognitoidentity
def test_get_id():
backend = server.create_backend_app("cognito-identity")
test_client = backend.test_client()
res = test_client.post('/',
data=json.dumps({'AccountId': 'someaccount',
'IdentityPoolId': 'us-west-2:12345',
'Logins': {'someurl': '12345'}}),
headers={
"X-Amz-Target": "com.amazonaws.cognito.identity.model.AWSCognitoIdentityService.GetId"},
)
print(res.data)
json_data = json.loads(res.data.decode("utf-8"))
assert ':' in json_data['IdentityId']