Add support for admin_user_global_sign_out to cognitoidp (#4092)
* Add support for admin_user_global_sign_out to cognitoidp
This commit is contained in:
parent
f849842414
commit
0f8da52196
@ -900,6 +900,19 @@ class CognitoIdpBackend(BaseBackend):
|
|||||||
user = user_pool.users[username]
|
user = user_pool.users[username]
|
||||||
user.update_attributes(attributes)
|
user.update_attributes(attributes)
|
||||||
|
|
||||||
|
def admin_user_global_sign_out(self, user_pool_id, username):
|
||||||
|
user_pool = self.user_pools.get(user_pool_id)
|
||||||
|
if not user_pool:
|
||||||
|
raise ResourceNotFoundError(user_pool_id)
|
||||||
|
|
||||||
|
if username not in user_pool.users:
|
||||||
|
raise UserNotFoundError(username)
|
||||||
|
|
||||||
|
for token, token_tuple in list(user_pool.refresh_tokens.items()):
|
||||||
|
_, username = token_tuple
|
||||||
|
if username == username:
|
||||||
|
user_pool.refresh_tokens[token] = None
|
||||||
|
|
||||||
def create_resource_server(self, user_pool_id, identifier, name, scopes):
|
def create_resource_server(self, user_pool_id, identifier, name, scopes):
|
||||||
user_pool = self.user_pools.get(user_pool_id)
|
user_pool = self.user_pools.get(user_pool_id)
|
||||||
if not user_pool:
|
if not user_pool:
|
||||||
@ -997,6 +1010,9 @@ class CognitoIdpBackend(BaseBackend):
|
|||||||
if not refresh_token:
|
if not refresh_token:
|
||||||
raise ResourceNotFoundError(refresh_token)
|
raise ResourceNotFoundError(refresh_token)
|
||||||
|
|
||||||
|
if user_pool.refresh_tokens[refresh_token] is None:
|
||||||
|
raise NotAuthorizedError("Refresh Token has been revoked")
|
||||||
|
|
||||||
client_id, username = user_pool.refresh_tokens[refresh_token]
|
client_id, username = user_pool.refresh_tokens[refresh_token]
|
||||||
if not username:
|
if not username:
|
||||||
raise ResourceNotFoundError(username)
|
raise ResourceNotFoundError(username)
|
||||||
|
@ -422,6 +422,14 @@ class CognitoIdpResponse(BaseResponse):
|
|||||||
)
|
)
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
def admin_user_global_sign_out(self):
|
||||||
|
user_pool_id = self._get_param("UserPoolId")
|
||||||
|
username = self._get_param("Username")
|
||||||
|
cognitoidp_backends[self.region].admin_user_global_sign_out(
|
||||||
|
user_pool_id, username
|
||||||
|
)
|
||||||
|
return ""
|
||||||
|
|
||||||
# Resource Server
|
# Resource Server
|
||||||
def create_resource_server(self):
|
def create_resource_server(self):
|
||||||
user_pool_id = self._get_param("UserPoolId")
|
user_pool_id = self._get_param("UserPoolId")
|
||||||
|
@ -1661,6 +1661,53 @@ def test_confirm_forgot_password():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_admin_user_global_sign_out():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
result = user_authentication_flow(conn)
|
||||||
|
|
||||||
|
conn.admin_user_global_sign_out(
|
||||||
|
UserPoolId=result["user_pool_id"], Username=result["username"],
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.initiate_auth(
|
||||||
|
ClientId=result["client_id"],
|
||||||
|
AuthFlow="REFRESH_TOKEN",
|
||||||
|
AuthParameters={
|
||||||
|
"REFRESH_TOKEN": result["refresh_token"],
|
||||||
|
"SECRET_HASH": result["secret_hash"],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
err = ex.value.response["Error"]
|
||||||
|
err["Code"].should.equal("NotAuthorizedException")
|
||||||
|
err["Message"].should.equal("Refresh Token has been revoked")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_admin_user_global_sign_out_unknown_userpool():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
result = user_authentication_flow(conn)
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.admin_user_global_sign_out(
|
||||||
|
UserPoolId="n/a", Username=result["username"],
|
||||||
|
)
|
||||||
|
err = ex.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ResourceNotFoundException")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_cognitoidp
|
||||||
|
def test_admin_user_global_sign_out_unknown_user():
|
||||||
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
result = user_authentication_flow(conn)
|
||||||
|
with pytest.raises(ClientError) as ex:
|
||||||
|
conn.admin_user_global_sign_out(
|
||||||
|
UserPoolId=result["user_pool_id"], Username="n/a",
|
||||||
|
)
|
||||||
|
err = ex.value.response["Error"]
|
||||||
|
err["Code"].should.equal("UserNotFoundException")
|
||||||
|
|
||||||
|
|
||||||
@mock_cognitoidp
|
@mock_cognitoidp
|
||||||
def test_admin_update_user_attributes():
|
def test_admin_update_user_attributes():
|
||||||
conn = boto3.client("cognito-idp", "us-west-2")
|
conn = boto3.client("cognito-idp", "us-west-2")
|
||||||
|
Loading…
Reference in New Issue
Block a user