moto/moto/elasticache/models.py
2023-02-24 09:29:21 -01:00

97 lines
2.9 KiB
Python

from typing import List, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from .exceptions import UserAlreadyExists, UserNotFound
class User(BaseModel):
def __init__(
self,
account_id: str,
region: str,
user_id: str,
user_name: str,
access_string: str,
engine: str,
no_password_required: bool,
passwords: Optional[List[str]] = None,
):
self.id = user_id
self.name = user_name
self.engine = engine
self.passwords = passwords or []
self.access_string = access_string
self.no_password_required = no_password_required
self.status = "active"
self.minimum_engine_version = "6.0"
self.usergroupids: List[str] = []
self.region = region
self.arn = f"arn:aws:elasticache:{self.region}:{account_id}:user:{self.id}"
class ElastiCacheBackend(BaseBackend):
"""Implementation of ElastiCache APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.users = dict()
self.users["default"] = User(
account_id=self.account_id,
region=self.region_name,
user_id="default",
user_name="default",
engine="redis",
access_string="on ~* +@all",
no_password_required=True,
)
def create_user(
self,
user_id: str,
user_name: str,
engine: str,
passwords: List[str],
access_string: str,
no_password_required: bool,
) -> User:
if user_id in self.users:
raise UserAlreadyExists
user = User(
account_id=self.account_id,
region=self.region_name,
user_id=user_id,
user_name=user_name,
engine=engine,
passwords=passwords,
access_string=access_string,
no_password_required=no_password_required,
)
self.users[user_id] = user
return user
def delete_user(self, user_id: str) -> User:
if user_id in self.users:
user = self.users[user_id]
if user.status == "active":
user.status = "deleting"
return user
raise UserNotFound(user_id)
def describe_users(self, user_id: Optional[str]) -> List[User]:
"""
Only the `user_id` parameter is currently supported.
Pagination is not yet implemented.
"""
if user_id:
if user_id in self.users:
user = self.users[user_id]
if user.status == "deleting":
self.users.pop(user_id)
return [user]
else:
raise UserNotFound(user_id)
return list(self.users.values())
elasticache_backends = BackendDict(ElastiCacheBackend, "elasticache")