moto/moto/ssm/models.py

145 lines
4.3 KiB
Python

from __future__ import unicode_literals
from collections import defaultdict
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
import time
class Parameter(BaseModel):
def __init__(self, name, value, type, description, keyid, last_modified_date, version):
self.name = name
self.type = type
self.description = description
self.keyid = keyid
self.last_modified_date = last_modified_date
self.version = version
if self.type == 'SecureString':
self.value = self.encrypt(value)
else:
self.value = value
def encrypt(self, value):
return 'kms:{}:'.format(self.keyid or 'default') + value
def decrypt(self, value):
if self.type != 'SecureString':
return value
prefix = 'kms:{}:'.format(self.keyid or 'default')
if value.startswith(prefix):
return value[len(prefix):]
def response_object(self, decrypt=False):
r = {
'Name': self.name,
'Type': self.type,
'Value': self.decrypt(self.value) if decrypt else self.value,
'Version': self.version,
}
return r
def describe_response_object(self, decrypt=False):
r = self.response_object(decrypt)
r['LastModifiedDate'] = int(self.last_modified_date)
r['LastModifiedUser'] = 'N/A'
if self.description:
r['Description'] = self.description
if self.keyid:
r['KeyId'] = self.keyid
return r
class SimpleSystemManagerBackend(BaseBackend):
def __init__(self):
self._parameters = {}
self._resource_tags = defaultdict(lambda: defaultdict(dict))
def delete_parameter(self, name):
try:
del self._parameters[name]
except KeyError:
pass
def delete_parameters(self, names):
result = []
for name in names:
try:
del self._parameters[name]
result.append(name)
except KeyError:
pass
return result
def get_all_parameters(self):
result = []
for k, _ in self._parameters.items():
result.append(self._parameters[k])
return result
def get_parameters(self, names, with_decryption):
result = []
for name in names:
if name in self._parameters:
result.append(self._parameters[name])
return result
def get_parameters_by_path(self, path, with_decryption, recursive):
"""Implement the get-parameters-by-path-API in the backend."""
result = []
# path could be with or without a trailing /. we handle this
# difference here.
path = path.rstrip('/') + '/'
for param in self._parameters:
if not param.startswith(path):
continue
if '/' in param[len(path) + 1:] and not recursive:
continue
result.append(self._parameters[param])
return result
def get_parameter(self, name, with_decryption):
if name in self._parameters:
return self._parameters[name]
return None
def put_parameter(self, name, description, value, type, keyid, overwrite):
previous_parameter = self._parameters.get(name)
version = 1
if previous_parameter:
version = previous_parameter.version + 1
if not overwrite:
return
last_modified_date = time.time()
self._parameters[name] = Parameter(
name, value, type, description, keyid, last_modified_date, version)
def add_tags_to_resource(self, resource_type, resource_id, tags):
for key, value in tags.items():
self._resource_tags[resource_type][resource_id][key] = value
def remove_tags_from_resource(self, resource_type, resource_id, keys):
tags = self._resource_tags[resource_type][resource_id]
for key in keys:
if key in tags:
del tags[key]
def list_tags_for_resource(self, resource_type, resource_id):
return self._resource_tags[resource_type][resource_id]
ssm_backends = {}
for region, ec2_backend in ec2_backends.items():
ssm_backends[region] = SimpleSystemManagerBackend()