diff --git a/moto/cognitoidp/models.py b/moto/cognitoidp/models.py index f16bfd081..b2e2bf985 100644 --- a/moto/cognitoidp/models.py +++ b/moto/cognitoidp/models.py @@ -136,6 +136,7 @@ class CognitoIdpUserPool(BaseModel): "token_use": token_use, "auth_time": now, "exp": now + expires_in, + "email": self.users[username].username, } payload.update(extra_data) headers = {"kid": "dummy"} # KID as present in jwks-public.json @@ -657,6 +658,10 @@ class CognitoIdpBackend(BaseBackend): UserStatus["FORCE_CHANGE_PASSWORD"], attributes, ) + user.attributes.append({"Name": "sub", "Value": user.id}) + user.attributes.append({"Name": "email_verified", "Value": True}) + user.attributes.append({"Name": "name", "Value": ""}) + user.attributes.append({"Name": "family_name", "Value": ""}) user_pool.users[user.username] = user return user @@ -670,6 +675,20 @@ class CognitoIdpBackend(BaseBackend): return user_pool.users[username] + def get_user(self, access_token): + for user_pool in self.user_pools.values(): + if access_token in user_pool.access_tokens: + _, username = user_pool.access_tokens[access_token] + user = user_pool.users.get(username) + if ( + not user + or not user.enabled + or user.status != UserStatus["CONFIRMED"] + ): + raise NotAuthorizedError("username") + return user + raise NotAuthorizedError("Invalid token") + @paginate(60, "pagination_token", "limit") def list_users(self, user_pool_id, pagination_token=None, limit=None): user_pool = self.user_pools.get(user_pool_id) diff --git a/moto/cognitoidp/responses.py b/moto/cognitoidp/responses.py index 4dab0d354..ced4930ae 100644 --- a/moto/cognitoidp/responses.py +++ b/moto/cognitoidp/responses.py @@ -313,6 +313,11 @@ class CognitoIdpResponse(BaseResponse): user = cognitoidp_backends[self.region].admin_get_user(user_pool_id, username) return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) + def get_user(self): + access_token = self._get_param("AccessToken") + user = cognitoidp_backends[self.region].get_user(access_token=access_token) + return json.dumps(user.to_json(extended=True, attributes_key="UserAttributes")) + def list_users(self): user_pool_id = self._get_param("UserPoolId") limit = self._get_param("Limit") diff --git a/tests/test_cognitoidp/test_cognitoidp.py b/tests/test_cognitoidp/test_cognitoidp.py index 08176b9be..de7936071 100644 --- a/tests/test_cognitoidp/test_cognitoidp.py +++ b/tests/test_cognitoidp/test_cognitoidp.py @@ -1,22 +1,24 @@ from __future__ import unicode_literals +import base64 +import boto3 import json import os import random import re + +import moto.cognitoidp.models +import requests import hmac import hashlib -import base64 - -import requests import uuid -import boto3 # noinspection PyUnresolvedReferences import sure # noqa from botocore.exceptions import ClientError, ParamValidationError from jose import jws, jwk, jwt +from unittest import SkipTest import pytest from moto import mock_cognitoidp, settings @@ -1042,9 +1044,17 @@ def test_admin_create_user(): result["User"]["Username"].should.equal(username) result["User"]["UserStatus"].should.equal("FORCE_CHANGE_PASSWORD") - result["User"]["Attributes"].should.have.length_of(1) - result["User"]["Attributes"][0]["Name"].should.equal("thing") - result["User"]["Attributes"][0]["Value"].should.equal(value) + result["User"]["Attributes"].should.have.length_of(5) + + def _verify_attribute(name, v): + attr = [a for a in result["User"]["Attributes"] if a["Name"] == name] + attr.should.have.length_of(1) + attr[0]["Value"].should.equal(v) + + _verify_attribute("thing", value) + _verify_attribute("name", "") + _verify_attribute("family_name", "") + _verify_attribute("email_verified", True) result["User"]["Enabled"].should.equal(True) @@ -1138,9 +1148,7 @@ def test_admin_get_user(): result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) result["Username"].should.equal(username) - result["UserAttributes"].should.have.length_of(1) - result["UserAttributes"][0]["Name"].should.equal("thing") - result["UserAttributes"][0]["Value"].should.equal(value) + result["UserAttributes"].should.have.length_of(5) @mock_cognitoidp @@ -1159,6 +1167,34 @@ def test_admin_get_missing_user(): caught.should.be.true +@mock_cognitoidp +def test_get_user(): + conn = boto3.client("cognito-idp", "us-west-2") + outputs = authentication_flow(conn, "ADMIN_NO_SRP_AUTH") + result = conn.get_user(AccessToken=outputs["access_token"]) + result["Username"].should.equal(outputs["username"]) + result["UserAttributes"].should.have.length_of(5) + + def _verify_attribute(name, v): + attr = [a for a in result["UserAttributes"] if a["Name"] == name] + attr.should.have.length_of(1) + attr[0]["Value"].should.equal(v) + + _verify_attribute("name", "") + _verify_attribute("family_name", "") + _verify_attribute("email_verified", True) + + +@mock_cognitoidp +def test_get_user_unknown_accesstoken(): + conn = boto3.client("cognito-idp", "us-west-2") + with pytest.raises(ClientError) as ex: + conn.get_user(AccessToken="n/a") + err = ex.value.response["Error"] + err["Code"].should.equal("NotAuthorizedException") + err["Message"].should.equal("Invalid token") + + @mock_cognitoidp def test_list_users(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1190,6 +1226,24 @@ def test_list_users(): result["Users"][0]["Username"].should.equal(username_bis) +@mock_cognitoidp +def test_get_user_unconfirmed(): + if settings.TEST_SERVER_MODE: + raise SkipTest("Cant patch attributes in server mode.") + conn = boto3.client("cognito-idp", "us-west-2") + outputs = authentication_flow(conn, "ADMIN_NO_SRP_AUTH") + + backend = moto.cognitoidp.models.cognitoidp_backends["us-west-2"] + user_pool = backend.user_pools[outputs["user_pool_id"]] + user_pool.users[outputs["username"]].status = "UNCONFIRMED" + + with pytest.raises(ClientError) as ex: + conn.get_user(AccessToken=outputs["access_token"]) + err = ex.value.response["Error"] + err["Code"].should.equal("NotAuthorizedException") + err["Message"].should.equal("username") + + @mock_cognitoidp def test_list_users_returns_limit_items(): conn = boto3.client("cognito-idp", "us-west-2") @@ -1967,9 +2021,7 @@ def test_admin_set_user_password(): ) result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) result["Username"].should.equal(username) - result["UserAttributes"].should.have.length_of(1) - result["UserAttributes"][0]["Name"].should.equal("thing") - result["UserAttributes"][0]["Value"].should.equal(value) + result["UserAttributes"].should.have.length_of(5) result["UserStatus"].should.equal("CONFIRMED")