From dd556a66c6f33d75a0bde70722ee0a04b06619fb Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 20 Jun 2020 10:43:02 +0100 Subject: [PATCH 1/3] CognitoIDP - Return KID in headers of ID token --- moto/cognitoidp/models.py | 6 +- moto/cognitoidp/urls.py | 2 +- tests/test_cognitoidp/test_cognitoidp.py | 75 +++++++++++++++++++++++- 3 files changed, 80 insertions(+), 3 deletions(-) diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index 93e297551..4b4e0a8b1 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -128,8 +128,12 @@ class CognitoIdpUserPool(BaseModel): "exp": now + expires_in, } payload.update(extra_data) + headers = {"kid": "dummy"} # KID as present in jwks-public.json - return jws.sign(payload, self.json_web_key, algorithm="RS256"), expires_in + return ( + jws.sign(payload, self.json_web_key, headers, algorithm="RS256"), + expires_in, + ) def create_id_token(self, client_id, username): extra_data = self.get_user_extra_data_by_client_id(client_id, username) diff --git a/moto/cognitoidp/urls.py b/moto/cognitoidp/urls.py index 5d1dff1d0..09e675e70 100644 --- a/moto/cognitoidp/urls.py +++ b/moto/cognitoidp/urls.py @@ -5,5 +5,5 @@ url_bases = ["https?://cognito-idp.(.+).amazonaws.com"] url_paths = { "{0}/$": CognitoIdpResponse.dispatch, - "{0}//.well-known/jwks.json$": CognitoIdpJsonWebKeyResponse().serve_json_web_key, + "{0}/(?P[^/]+)/.well-known/jwks.json$": CognitoIdpJsonWebKeyResponse().serve_json_web_key, } diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 37e1a56a3..aefa573ef 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import json import os import random +import requests import uuid import boto3 @@ -10,7 +11,7 @@ import boto3 # noinspection PyUnresolvedReferences import sure # noqa from botocore.exceptions import ClientError -from jose import jws +from jose import jws, jwk, jwt from nose.tools import assert_raises from moto import mock_cognitoidp @@ -1309,3 +1310,75 @@ def test_admin_update_user_attributes(): val.should.equal("Doe") elif attr["Name"] == "given_name": val.should.equal("Jane") + + +@mock_cognitoidp +def test_idtoken_contains_kid_header(): + # https://github.com/spulec/moto/issues/3078 + # Setup + cognito = boto3.client("cognito-idp", "us-west-2") + user_pool_id = cognito.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"][ + "Id" + ] + client = cognito.create_user_pool_client( + UserPoolId=user_pool_id, + ExplicitAuthFlows=[ + "ALLOW_ADMIN_USER_PASSWORD_AUTH", + "ALLOW_REFRESH_TOKEN_AUTH", + "ALLOW_ADMIN_NO_SRP_AUTH", + ], + AllowedOAuthFlows=["code", "implicit"], + ClientName=str(uuid.uuid4()), + CallbackURLs=["https://example.com"], + ) + client_id = client["UserPoolClient"]["ClientId"] + username = str(uuid.uuid4()) + temporary_password = "1TemporaryP@ssword" + cognito.admin_create_user( + UserPoolId=user_pool_id, Username=username, TemporaryPassword=temporary_password + ) + result = cognito.admin_initiate_auth( + UserPoolId=user_pool_id, + ClientId=client_id, + AuthFlow="ADMIN_NO_SRP_AUTH", + AuthParameters={"USERNAME": username, "PASSWORD": temporary_password}, + ) + + # A newly created user is forced to set a new password + # This sets a new password and logs the user in (creates tokens) + password = "1F@kePassword" + result = cognito.respond_to_auth_challenge( + Session=result["Session"], + ClientId=client_id, + ChallengeName="NEW_PASSWORD_REQUIRED", + ChallengeResponses={"USERNAME": username, "NEW_PASSWORD": password}, + ) + # + id_token = result["AuthenticationResult"]["IdToken"] + + # Verify the KID header is present in the token, and corresponds to the KID supplied by the public JWT + verify_kid_header(id_token) + + +def verify_kid_header(token): + """Verifies the kid-header is corresponds with the public key""" + headers = jwt.get_unverified_headers(token) + kid = headers["kid"] + + key_index = -1 + keys = fetch_public_keys() + for i in range(len(keys)): + if kid == keys[i]["kid"]: + key_index = i + break + if key_index == -1: + raise Exception("Public key (kid) not found in jwks.json") + + +def fetch_public_keys(): + keys_url = "https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json".format( + "us-west-2", "someuserpoolid" + ) + response = requests.get(keys_url).text + my_keys = json.loads(response.decode("utf-8"))["keys"] + return my_keys From 655b92a2a4288407705f07ae7cd468ca5b14081f Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 20 Jun 2020 11:05:06 +0100 Subject: [PATCH 2/3] Simplify Cognito test - auto decode JSON --- tests/test_cognitoidp/test_cognitoidp.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index aefa573ef..5eb529e28 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -1379,6 +1379,5 @@ def fetch_public_keys(): keys_url = "https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json".format( "us-west-2", "someuserpoolid" ) - response = requests.get(keys_url).text - my_keys = json.loads(response.decode("utf-8"))["keys"] - return my_keys + response = requests.get(keys_url).json() + return response["keys"] From f27e29e04d51b800a87be244bbe9c86231f59dea Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 20 Jun 2020 12:48:10 +0100 Subject: [PATCH 3/3] Cognito - Dont run test in ServerMode --- tests/test_cognitoidp/test_cognitoidp.py | 96 +++++++++++++----------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 5eb529e28..3b7037889 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -14,7 +14,7 @@ from botocore.exceptions import ClientError from jose import jws, jwk, jwt from nose.tools import assert_raises -from moto import mock_cognitoidp +from moto import mock_cognitoidp, settings from moto.core import ACCOUNT_ID @@ -1312,52 +1312,58 @@ def test_admin_update_user_attributes(): val.should.equal("Jane") -@mock_cognitoidp -def test_idtoken_contains_kid_header(): - # https://github.com/spulec/moto/issues/3078 - # Setup - cognito = boto3.client("cognito-idp", "us-west-2") - user_pool_id = cognito.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"][ - "Id" - ] - client = cognito.create_user_pool_client( - UserPoolId=user_pool_id, - ExplicitAuthFlows=[ - "ALLOW_ADMIN_USER_PASSWORD_AUTH", - "ALLOW_REFRESH_TOKEN_AUTH", - "ALLOW_ADMIN_NO_SRP_AUTH", - ], - AllowedOAuthFlows=["code", "implicit"], - ClientName=str(uuid.uuid4()), - CallbackURLs=["https://example.com"], - ) - client_id = client["UserPoolClient"]["ClientId"] - username = str(uuid.uuid4()) - temporary_password = "1TemporaryP@ssword" - cognito.admin_create_user( - UserPoolId=user_pool_id, Username=username, TemporaryPassword=temporary_password - ) - result = cognito.admin_initiate_auth( - UserPoolId=user_pool_id, - ClientId=client_id, - AuthFlow="ADMIN_NO_SRP_AUTH", - AuthParameters={"USERNAME": username, "PASSWORD": temporary_password}, - ) +# Test will retrieve public key from cognito.amazonaws.com/.well-known/jwks.json, +# which isnt mocked in ServerMode +if not settings.TEST_SERVER_MODE: - # A newly created user is forced to set a new password - # This sets a new password and logs the user in (creates tokens) - password = "1F@kePassword" - result = cognito.respond_to_auth_challenge( - Session=result["Session"], - ClientId=client_id, - ChallengeName="NEW_PASSWORD_REQUIRED", - ChallengeResponses={"USERNAME": username, "NEW_PASSWORD": password}, - ) - # - id_token = result["AuthenticationResult"]["IdToken"] + @mock_cognitoidp + def test_idtoken_contains_kid_header(): + # https://github.com/spulec/moto/issues/3078 + # Setup + cognito = boto3.client("cognito-idp", "us-west-2") + user_pool_id = cognito.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"][ + "Id" + ] + client = cognito.create_user_pool_client( + UserPoolId=user_pool_id, + ExplicitAuthFlows=[ + "ALLOW_ADMIN_USER_PASSWORD_AUTH", + "ALLOW_REFRESH_TOKEN_AUTH", + "ALLOW_ADMIN_NO_SRP_AUTH", + ], + AllowedOAuthFlows=["code", "implicit"], + ClientName=str(uuid.uuid4()), + CallbackURLs=["https://example.com"], + ) + client_id = client["UserPoolClient"]["ClientId"] + username = str(uuid.uuid4()) + temporary_password = "1TemporaryP@ssword" + cognito.admin_create_user( + UserPoolId=user_pool_id, + Username=username, + TemporaryPassword=temporary_password, + ) + result = cognito.admin_initiate_auth( + UserPoolId=user_pool_id, + ClientId=client_id, + AuthFlow="ADMIN_NO_SRP_AUTH", + AuthParameters={"USERNAME": username, "PASSWORD": temporary_password}, + ) - # Verify the KID header is present in the token, and corresponds to the KID supplied by the public JWT - verify_kid_header(id_token) + # A newly created user is forced to set a new password + # This sets a new password and logs the user in (creates tokens) + password = "1F@kePassword" + result = cognito.respond_to_auth_challenge( + Session=result["Session"], + ClientId=client_id, + ChallengeName="NEW_PASSWORD_REQUIRED", + ChallengeResponses={"USERNAME": username, "NEW_PASSWORD": password}, + ) + # + id_token = result["AuthenticationResult"]["IdToken"] + + # Verify the KID header is present in the token, and corresponds to the KID supplied by the public JWT + verify_kid_header(id_token) def verify_kid_header(token):