Merge pull request #2311 from caguado/fix/2310

Implement get_open_id_token
This commit is contained in:
Steve Pulec 2019-07-20 00:12:18 -04:00 committed by GitHub
commit 5c34c06d07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 69 additions and 19 deletions

View File

@ -815,16 +815,16 @@
- [ ] update_user_profile
## cognito-identity - 0% implemented
- [ ] create_identity_pool
- [X] create_identity_pool
- [ ] delete_identities
- [ ] delete_identity_pool
- [ ] describe_identity
- [ ] describe_identity_pool
- [ ] get_credentials_for_identity
- [ ] get_id
- [X] get_credentials_for_identity
- [X] get_id
- [ ] get_identity_pool_roles
- [ ] get_open_id_token
- [ ] get_open_id_token_for_developer_identity
- [X] get_open_id_token
- [X] get_open_id_token_for_developer_identity
- [ ] list_identities
- [ ] list_identity_pools
- [ ] lookup_developer_identity

View File

@ -95,6 +95,15 @@ class CognitoIdentityBackend(BaseBackend):
})
return response
def get_open_id_token(self, identity_id):
response = json.dumps(
{
"IdentityId": identity_id,
"Token": get_random_identity_id(self.region)
}
)
return response
cognitoidentity_backends = {}
for region in boto.cognito.identity.regions():

View File

@ -35,3 +35,8 @@ class CognitoIdentityResponse(BaseResponse):
return cognitoidentity_backends[self.region].get_open_id_token_for_developer_identity(
self._get_param('IdentityId') or get_random_identity_id(self.region)
)
def get_open_id_token(self):
return cognitoidentity_backends[self.region].get_open_id_token(
self._get_param("IdentityId") or get_random_identity_id(self.region)
)

View File

@ -21,6 +21,16 @@ from moto.core.utils import convert_flask_to_httpretty_response
HTTP_METHODS = ["GET", "POST", "PUT", "DELETE", "HEAD", "PATCH"]
DEFAULT_SERVICE_REGION = ('s3', 'us-east-1')
# Map of unsigned calls to service-region as per AWS API docs
# https://docs.aws.amazon.com/cognito/latest/developerguide/resource-permissions.html#amazon-cognito-signed-versus-unsigned-apis
UNSIGNED_REQUESTS = {
'AWSCognitoIdentityService': ('cognito-identity', 'us-east-1'),
'AWSCognitoIdentityProviderService': ('cognito-idp', 'us-east-1'),
}
class DomainDispatcherApplication(object):
"""
Dispatch requests to different applications based on the "Host:" header
@ -50,6 +60,32 @@ class DomainDispatcherApplication(object):
raise RuntimeError('Invalid host: "%s"' % host)
def infer_service_region(self, environ):
auth = environ.get('HTTP_AUTHORIZATION')
if auth:
# Signed request
# Parse auth header to find service assuming a SigV4 request
# https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
# ['Credential=sdffdsa', '20170220', 'us-east-1', 'sns', 'aws4_request']
try:
credential_scope = auth.split(",")[0].split()[1]
_, _, region, service, _ = credential_scope.split("/")
return service, region
except ValueError:
# Signature format does not match, this is exceptional and we can't
# infer a service-region. A reduced set of services still use
# the deprecated SigV2, ergo prefer S3 as most likely default.
# https://docs.aws.amazon.com/general/latest/gr/signature-version-2.html
return DEFAULT_SERVICE_REGION
else:
# Unsigned request
target = environ.get('HTTP_X_AMZ_TARGET')
if target:
service, _ = target.split('.', 1)
return UNSIGNED_REQUESTS.get(service, DEFAULT_SERVICE_REGION)
# S3 is the last resort when the target is also unknown
return DEFAULT_SERVICE_REGION
def get_application(self, environ):
path_info = environ.get('PATH_INFO', '')
@ -66,19 +102,7 @@ class DomainDispatcherApplication(object):
else:
host = environ['HTTP_HOST'].split(':')[0]
if host in {'localhost', 'motoserver'} or host.startswith("192.168."):
# Fall back to parsing auth header to find service
# ['Credential=sdffdsa', '20170220', 'us-east-1', 'sns', 'aws4_request']
try:
_, _, region, service, _ = environ['HTTP_AUTHORIZATION'].split(",")[0].split()[
1].split("/")
except (KeyError, ValueError):
# Some cognito-idp endpoints (e.g. change password) do not receive an auth header.
if environ.get('HTTP_X_AMZ_TARGET', '').startswith('AWSCognitoIdentityProviderService'):
service = 'cognito-idp'
else:
service = 's3'
region = 'us-east-1'
service, region = self.infer_service_region(environ)
if service == 'dynamodb':
if environ['HTTP_X_AMZ_TARGET'].startswith('DynamoDBStreams'):
host = 'dynamodbstreams'

View File

@ -68,7 +68,7 @@ def test_get_open_id_token_for_developer_identity():
},
TokenDuration=123
)
assert len(result['Token'])
assert len(result['Token']) > 0
assert result['IdentityId'] == '12345'
@mock_cognitoidentity
@ -83,3 +83,15 @@ def test_get_open_id_token_for_developer_identity_when_no_explicit_identity_id()
)
assert len(result['Token']) > 0
assert len(result['IdentityId']) > 0
@mock_cognitoidentity
def test_get_open_id_token():
conn = boto3.client('cognito-identity', 'us-west-2')
result = conn.get_open_id_token(
IdentityId='12345',
Logins={
'someurl': '12345'
}
)
assert len(result['Token']) > 0
assert result['IdentityId'] == '12345'