Add support for USER_PASSWORD_AUTH auth method (cognito-idp) (#4164)
Co-authored-by: Bartosz Sledz <bartosz.sledz@hitachivantara.com>
This commit is contained in:
parent
cbfb450430
commit
b3fe48ece0
@ -1005,6 +1005,39 @@ class CognitoIdpBackend(BaseBackend):
|
|||||||
"SECRET_BLOCK": session,
|
"SECRET_BLOCK": session,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
elif auth_flow == "USER_PASSWORD_AUTH":
|
||||||
|
username = auth_parameters.get("USERNAME")
|
||||||
|
password = auth_parameters.get("PASSWORD")
|
||||||
|
|
||||||
|
user = user_pool.users.get(username)
|
||||||
|
|
||||||
|
if not user:
|
||||||
|
raise UserNotFoundError(username)
|
||||||
|
|
||||||
|
if user.password != password:
|
||||||
|
raise NotAuthorizedError("Incorrect username or password.")
|
||||||
|
|
||||||
|
if user.status == UserStatus["UNCONFIRMED"]:
|
||||||
|
raise UserNotConfirmedException("User is not confirmed.")
|
||||||
|
|
||||||
|
session = str(uuid.uuid4())
|
||||||
|
self.sessions[session] = user_pool
|
||||||
|
|
||||||
|
access_token, expires_in = user_pool.create_access_token(
|
||||||
|
client_id, username
|
||||||
|
)
|
||||||
|
id_token, _ = user_pool.create_id_token(client_id, username)
|
||||||
|
refresh_token = user_pool.create_refresh_token(client_id, username)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"AuthenticationResult": {
|
||||||
|
"IdToken": id_token,
|
||||||
|
"AccessToken": access_token,
|
||||||
|
"ExpiresIn": expires_in,
|
||||||
|
"RefreshToken": refresh_token,
|
||||||
|
"TokenType": "Bearer",
|
||||||
|
}
|
||||||
|
}
|
||||||
elif auth_flow == "REFRESH_TOKEN":
|
elif auth_flow == "REFRESH_TOKEN":
|
||||||
refresh_token = auth_parameters.get("REFRESH_TOKEN")
|
refresh_token = auth_parameters.get("REFRESH_TOKEN")
|
||||||
if not refresh_token:
|
if not refresh_token:
|
||||||
|
@ -1923,6 +1923,73 @@ def test_initiate_auth_REFRESH_TOKEN():
|
|||||||
result["AuthenticationResult"]["AccessToken"].should_not.be.none
|
result["AuthenticationResult"]["AccessToken"].should_not.be.none
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_initiate_auth_USER_PASSWORD_AUTH():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
result = user_authentication_flow(conn)
|
||||||
|
result = conn.initiate_auth(
|
||||||
|
ClientId=result["client_id"],
|
||||||
|
AuthFlow="USER_PASSWORD_AUTH",
|
||||||
|
AuthParameters={"USERNAME": result["username"], "PASSWORD": result["password"]},
|
||||||
|
)
|
||||||
|
|
||||||
|
result["AuthenticationResult"]["AccessToken"].should_not.be.none
|
||||||
|
result["AuthenticationResult"]["IdToken"].should_not.be.none
|
||||||
|
result["AuthenticationResult"]["RefreshToken"].should_not.be.none
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_initiate_auth_USER_PASSWORD_AUTH_user_not_found():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
result = user_authentication_flow(conn)
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.initiate_auth(
|
||||||
|
ClientId=result["client_id"],
|
||||||
|
AuthFlow="USER_PASSWORD_AUTH",
|
||||||
|
AuthParameters={"USERNAME": "INVALIDUSER", "PASSWORD": result["password"]},
|
||||||
|
)
|
||||||
|
err = ex.value.response["Error"]
|
||||||
|
err["Code"].should.equal("UserNotFoundException")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_initiate_auth_USER_PASSWORD_AUTH_user_incorrect_password():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
result = user_authentication_flow(conn)
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.initiate_auth(
|
||||||
|
ClientId=result["client_id"],
|
||||||
|
AuthFlow="USER_PASSWORD_AUTH",
|
||||||
|
AuthParameters={
|
||||||
|
"USERNAME": result["username"],
|
||||||
|
"PASSWORD": "NotAuthorizedException",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
err = ex.value.response["Error"]
|
||||||
|
err["Code"].should.equal("NotAuthorizedException")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_initiate_auth_USER_PASSWORD_AUTH_unconfirmed_user():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
username = str(uuid.uuid4())
|
||||||
|
password = str(uuid.uuid4())
|
||||||
|
user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"]
|
||||||
|
client_id = conn.create_user_pool_client(
|
||||||
|
UserPoolId=user_pool_id, ClientName=str(uuid.uuid4()), GenerateSecret=True,
|
||||||
|
)["UserPoolClient"]["ClientId"]
|
||||||
|
conn.sign_up(ClientId=client_id, Username=username, Password=password)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.initiate_auth(
|
||||||
|
ClientId=client_id,
|
||||||
|
AuthFlow="USER_PASSWORD_AUTH",
|
||||||
|
AuthParameters={"USERNAME": username, "PASSWORD": password},
|
||||||
|
)
|
||||||
|
err = ex.value.response["Error"]
|
||||||
|
err["Code"].should.equal("UserNotConfirmedException")
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
def test_initiate_auth_for_unconfirmed_user():
|
def test_initiate_auth_for_unconfirmed_user():
|
||||||
conn = boto3.client("cognito-idp", "us-west-2")
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
Loading…
Reference in New Issue
Block a user