CognitoIDP: get_user (#4038)

* cognito-idp get_user support

* add carium expected attributes

* CognitoIDP#get_user - Add negative tests

Co-authored-by: Lalitha Kolla <lalitha.kolla@carium.com>
This commit is contained in:
Bert Blommers 2021-06-27 15:48:31 +01:00 committed by GitHub
parent 167423777b
commit 2590bf0e80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 13 deletions

View File

@ -136,6 +136,7 @@ class CognitoIdpUserPool(BaseModel):
"token_use": token_use,
"auth_time": now,
"exp": now + expires_in,
"email": self.users[username].username,
}
payload.update(extra_data)
headers = {"kid": "dummy"} # KID as present in jwks-public.json
@ -657,6 +658,10 @@ class CognitoIdpBackend(BaseBackend):
UserStatus["FORCE_CHANGE_PASSWORD"],
attributes,
)
user.attributes.append({"Name": "sub", "Value": user.id})
user.attributes.append({"Name": "email_verified", "Value": True})
user.attributes.append({"Name": "name", "Value": ""})
user.attributes.append({"Name": "family_name", "Value": ""})
user_pool.users[user.username] = user
return user
@ -670,6 +675,20 @@ class CognitoIdpBackend(BaseBackend):
return user_pool.users[username]
def get_user(self, access_token):
for user_pool in self.user_pools.values():
if access_token in user_pool.access_tokens:
_, username = user_pool.access_tokens[access_token]
user = user_pool.users.get(username)
if (
not user
or not user.enabled
or user.status != UserStatus["CONFIRMED"]
):
raise NotAuthorizedError("username")
return user
raise NotAuthorizedError("Invalid token")
@paginate(60, "pagination_token", "limit")
def list_users(self, user_pool_id, pagination_token=None, limit=None):
user_pool = self.user_pools.get(user_pool_id)

View File

@ -313,6 +313,11 @@ class CognitoIdpResponse(BaseResponse):
user = cognitoidp_backends[self.region].admin_get_user(user_pool_id, username)
return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes"))
def get_user(self):
access_token = self._get_param("AccessToken")
user = cognitoidp_backends[self.region].get_user(access_token=access_token)
return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes"))
def list_users(self):
user_pool_id = self._get_param("UserPoolId")
limit = self._get_param("Limit")

View File

@ -1,22 +1,24 @@
from __future__ import unicode_literals
import base64
import boto3
import json
import os
import random
import re
import moto.cognitoidp.models
import requests
import hmac
import hashlib
import base64
import requests
import uuid
import boto3
# noinspection PyUnresolvedReferences
import sure # noqa
from botocore.exceptions import ClientError, ParamValidationError
from jose import jws, jwk, jwt
from unittest import SkipTest
import pytest
from moto import mock_cognitoidp, settings
@ -1042,9 +1044,17 @@ def test_admin_create_user():
result["User"]["Username"].should.equal(username)
result["User"]["UserStatus"].should.equal("FORCE_CHANGE_PASSWORD")
result["User"]["Attributes"].should.have.length_of(1)
result["User"]["Attributes"][0]["Name"].should.equal("thing")
result["User"]["Attributes"][0]["Value"].should.equal(value)
result["User"]["Attributes"].should.have.length_of(5)
def _verify_attribute(name, v):
attr = [a for a in result["User"]["Attributes"] if a["Name"] == name]
attr.should.have.length_of(1)
attr[0]["Value"].should.equal(v)
_verify_attribute("thing", value)
_verify_attribute("name", "")
_verify_attribute("family_name", "")
_verify_attribute("email_verified", True)
result["User"]["Enabled"].should.equal(True)
@ -1138,9 +1148,7 @@ def test_admin_get_user():
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["Username"].should.equal(username)
result["UserAttributes"].should.have.length_of(1)
result["UserAttributes"][0]["Name"].should.equal("thing")
result["UserAttributes"][0]["Value"].should.equal(value)
result["UserAttributes"].should.have.length_of(5)
@mock_cognitoidp
@ -1159,6 +1167,34 @@ def test_admin_get_missing_user():
caught.should.be.true
@mock_cognitoidp
def test_get_user():
conn = boto3.client("cognito-idp", "us-west-2")
outputs = authentication_flow(conn, "ADMIN_NO_SRP_AUTH")
result = conn.get_user(AccessToken=outputs["access_token"])
result["Username"].should.equal(outputs["username"])
result["UserAttributes"].should.have.length_of(5)
def _verify_attribute(name, v):
attr = [a for a in result["UserAttributes"] if a["Name"] == name]
attr.should.have.length_of(1)
attr[0]["Value"].should.equal(v)
_verify_attribute("name", "")
_verify_attribute("family_name", "")
_verify_attribute("email_verified", True)
@mock_cognitoidp
def test_get_user_unknown_accesstoken():
conn = boto3.client("cognito-idp", "us-west-2")
with pytest.raises(ClientError) as ex:
conn.get_user(AccessToken="n/a")
err = ex.value.response["Error"]
err["Code"].should.equal("NotAuthorizedException")
err["Message"].should.equal("Invalid token")
@mock_cognitoidp
def test_list_users():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1190,6 +1226,24 @@ def test_list_users():
result["Users"][0]["Username"].should.equal(username_bis)
@mock_cognitoidp
def test_get_user_unconfirmed():
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant patch attributes in server mode.")
conn = boto3.client("cognito-idp", "us-west-2")
outputs = authentication_flow(conn, "ADMIN_NO_SRP_AUTH")
backend = moto.cognitoidp.models.cognitoidp_backends["us-west-2"]
user_pool = backend.user_pools[outputs["user_pool_id"]]
user_pool.users[outputs["username"]].status = "UNCONFIRMED"
with pytest.raises(ClientError) as ex:
conn.get_user(AccessToken=outputs["access_token"])
err = ex.value.response["Error"]
err["Code"].should.equal("NotAuthorizedException")
err["Message"].should.equal("username")
@mock_cognitoidp
def test_list_users_returns_limit_items():
conn = boto3.client("cognito-idp", "us-west-2")
@ -1967,9 +2021,7 @@ def test_admin_set_user_password():
)
result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username)
result["Username"].should.equal(username)
result["UserAttributes"].should.have.length_of(1)
result["UserAttributes"][0]["Name"].should.equal("thing")
result["UserAttributes"][0]["Value"].should.equal(value)
result["UserAttributes"].should.have.length_of(5)
result["UserStatus"].should.equal("CONFIRMED")