CognitoIDP - allow unauthenticated requests to GetUser (#5099)

This commit is contained in:
Bert Blommers 2022-05-05 22:34:37 +00:00 committed by GitHub
parent 3f89b98889
commit c2727a7c20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 1 deletions

View File

@ -1800,6 +1800,15 @@ class GlobalCognitoIdpBackend(CognitoIdpBackend):
# Without authentication-header, we lose the context of which region the request was send to # Without authentication-header, we lose the context of which region the request was send to
# This backend will cycle through all backends as a workaround # This backend will cycle through all backends as a workaround
def _find_backend_by_access_token(self, access_token):
for region, backend in cognitoidp_backends.items():
if region == "global":
continue
for p in backend.user_pools.values():
if access_token in p.access_tokens:
return backend
return cognitoidp_backends["us-east-1"]
def _find_backend_for_clientid(self, client_id): def _find_backend_for_clientid(self, client_id):
for region, backend in cognitoidp_backends.items(): for region, backend in cognitoidp_backends.items():
if region == "global": if region == "global":
@ -1821,6 +1830,10 @@ class GlobalCognitoIdpBackend(CognitoIdpBackend):
backend = self._find_backend_for_clientid(client_id) backend = self._find_backend_for_clientid(client_id)
return backend.confirm_sign_up(client_id, username) return backend.confirm_sign_up(client_id, username)
def get_user(self, access_token):
backend = self._find_backend_by_access_token(access_token)
return backend.get_user(access_token)
cognitoidp_backends = BackendDict(CognitoIdpBackend, "cognito-idp") cognitoidp_backends = BackendDict(CognitoIdpBackend, "cognito-idp")
cognitoidp_backends["global"] = GlobalCognitoIdpBackend("global") cognitoidp_backends["global"] = GlobalCognitoIdpBackend("global")

View File

@ -346,7 +346,7 @@ class CognitoIdpResponse(BaseResponse):
def get_user(self): def get_user(self):
access_token = self._get_param("AccessToken") access_token = self._get_param("AccessToken")
user = cognitoidp_backends[self.region].get_user(access_token=access_token) user = cognitoidp_backends["global"].get_user(access_token=access_token)
return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes"))
def list_users(self): def list_users(self):

View File

@ -55,3 +55,42 @@ def test_sign_up_method_without_authentication():
) )
res.status_code.should.equal(200) res.status_code.should.equal(200)
json.loads(res.data).should.have.key("UserConfirmed").equals(False) json.loads(res.data).should.have.key("UserConfirmed").equals(False)
# Confirm Sign Up User
data = {
"ClientId": client_id,
"Username": "test@gmail.com",
"ConfirmationCode": "sth",
}
res = test_client.post(
"/",
data=json.dumps(data),
headers={"X-Amz-Target": "AWSCognitoIdentityProviderService.ConfirmSignUp"},
)
# Initiate Auth
data = {
"ClientId": client_id,
"AuthFlow": "USER_PASSWORD_AUTH",
"AuthParameters": {"USERNAME": "test@gmail.com", "PASSWORD": "12345678"},
}
res = test_client.post(
"/",
data=json.dumps(data),
headers={"X-Amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth"},
)
res.status_code.should.equal(200)
access_token = json.loads(res.data)["AuthenticationResult"]["AccessToken"]
# Get User
data = {"AccessToken": access_token}
res = test_client.post(
"/",
data=json.dumps(data),
headers={"X-Amz-Target": "AWSCognitoIdentityProviderService.GetUser"},
)
res.status_code.should.equal(200)
data = json.loads(res.data)
data.should.have.key("UserPoolId").equals(user_pool_id)
data.should.have.key("Username").equals("test@gmail.com")
data.should.have.key("UserStatus").equals("CONFIRMED")