Merge pull request #3081 from bblommers/cognitoidp_return_kid_header
CognitoIDP: Return kid header as part of respond_to_auth_challenge
This commit is contained in:
commit
f061fbf6b9
@ -128,8 +128,12 @@ class CognitoIdpUserPool(BaseModel):
|
||||
"exp": now + expires_in,
|
||||
}
|
||||
payload.update(extra_data)
|
||||
headers = {"kid": "dummy"} # KID as present in jwks-public.json
|
||||
|
||||
return jws.sign(payload, self.json_web_key, algorithm="RS256"), expires_in
|
||||
return (
|
||||
jws.sign(payload, self.json_web_key, headers, algorithm="RS256"),
|
||||
expires_in,
|
||||
)
|
||||
|
||||
def create_id_token(self, client_id, username):
|
||||
extra_data = self.get_user_extra_data_by_client_id(client_id, username)
|
||||
|
@ -5,5 +5,5 @@ url_bases = ["https?://cognito-idp.(.+).amazonaws.com"]
|
||||
|
||||
url_paths = {
|
||||
"{0}/$": CognitoIdpResponse.dispatch,
|
||||
"{0}/<user_pool_id>/.well-known/jwks.json$": CognitoIdpJsonWebKeyResponse().serve_json_web_key,
|
||||
"{0}/(?P<user_pool_id>[^/]+)/.well-known/jwks.json$": CognitoIdpJsonWebKeyResponse().serve_json_web_key,
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ from __future__ import unicode_literals
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import requests
|
||||
import uuid
|
||||
|
||||
import boto3
|
||||
@ -10,10 +11,10 @@ import boto3
|
||||
# noinspection PyUnresolvedReferences
|
||||
import sure # noqa
|
||||
from botocore.exceptions import ClientError
|
||||
from jose import jws
|
||||
from jose import jws, jwk, jwt
|
||||
from nose.tools import assert_raises
|
||||
|
||||
from moto import mock_cognitoidp
|
||||
from moto import mock_cognitoidp, settings
|
||||
from moto.core import ACCOUNT_ID
|
||||
|
||||
|
||||
@ -1341,3 +1342,80 @@ def test_admin_update_user_attributes():
|
||||
val.should.equal("Doe")
|
||||
elif attr["Name"] == "given_name":
|
||||
val.should.equal("Jane")
|
||||
|
||||
|
||||
# Test will retrieve public key from cognito.amazonaws.com/.well-known/jwks.json,
|
||||
# which isnt mocked in ServerMode
|
||||
if not settings.TEST_SERVER_MODE:
|
||||
|
||||
@mock_cognitoidp
|
||||
def test_idtoken_contains_kid_header():
|
||||
# https://github.com/spulec/moto/issues/3078
|
||||
# Setup
|
||||
cognito = boto3.client("cognito-idp", "us-west-2")
|
||||
user_pool_id = cognito.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"][
|
||||
"Id"
|
||||
]
|
||||
client = cognito.create_user_pool_client(
|
||||
UserPoolId=user_pool_id,
|
||||
ExplicitAuthFlows=[
|
||||
"ALLOW_ADMIN_USER_PASSWORD_AUTH",
|
||||
"ALLOW_REFRESH_TOKEN_AUTH",
|
||||
"ALLOW_ADMIN_NO_SRP_AUTH",
|
||||
],
|
||||
AllowedOAuthFlows=["code", "implicit"],
|
||||
ClientName=str(uuid.uuid4()),
|
||||
CallbackURLs=["https://example.com"],
|
||||
)
|
||||
client_id = client["UserPoolClient"]["ClientId"]
|
||||
username = str(uuid.uuid4())
|
||||
temporary_password = "1TemporaryP@ssword"
|
||||
cognito.admin_create_user(
|
||||
UserPoolId=user_pool_id,
|
||||
Username=username,
|
||||
TemporaryPassword=temporary_password,
|
||||
)
|
||||
result = cognito.admin_initiate_auth(
|
||||
UserPoolId=user_pool_id,
|
||||
ClientId=client_id,
|
||||
AuthFlow="ADMIN_NO_SRP_AUTH",
|
||||
AuthParameters={"USERNAME": username, "PASSWORD": temporary_password},
|
||||
)
|
||||
|
||||
# A newly created user is forced to set a new password
|
||||
# This sets a new password and logs the user in (creates tokens)
|
||||
password = "1F@kePassword"
|
||||
result = cognito.respond_to_auth_challenge(
|
||||
Session=result["Session"],
|
||||
ClientId=client_id,
|
||||
ChallengeName="NEW_PASSWORD_REQUIRED",
|
||||
ChallengeResponses={"USERNAME": username, "NEW_PASSWORD": password},
|
||||
)
|
||||
#
|
||||
id_token = result["AuthenticationResult"]["IdToken"]
|
||||
|
||||
# Verify the KID header is present in the token, and corresponds to the KID supplied by the public JWT
|
||||
verify_kid_header(id_token)
|
||||
|
||||
|
||||
def verify_kid_header(token):
|
||||
"""Verifies the kid-header is corresponds with the public key"""
|
||||
headers = jwt.get_unverified_headers(token)
|
||||
kid = headers["kid"]
|
||||
|
||||
key_index = -1
|
||||
keys = fetch_public_keys()
|
||||
for i in range(len(keys)):
|
||||
if kid == keys[i]["kid"]:
|
||||
key_index = i
|
||||
break
|
||||
if key_index == -1:
|
||||
raise Exception("Public key (kid) not found in jwks.json")
|
||||
|
||||
|
||||
def fetch_public_keys():
|
||||
keys_url = "https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json".format(
|
||||
"us-west-2", "someuserpoolid"
|
||||
)
|
||||
response = requests.get(keys_url).json()
|
||||
return response["keys"]
|
||||
|
Loading…
Reference in New Issue
Block a user