Merge pull request #2534 from william-richard/add-ssm-get-parameter-history-support

Add ssm `get_parameter_history` support
This commit is contained in:
Steve Pulec 2019-11-04 22:54:57 -06:00 committed by GitHub
commit 1f9208e19c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 138 additions and 23 deletions

View File

@ -259,7 +259,10 @@ class Command(BaseModel):
class SimpleSystemManagerBackend(BaseBackend):
def __init__(self):
self._parameters = {}
# each value is a list of all of the versions for a parameter
# to get the current value, grab the last item of the list
self._parameters = defaultdict(list)
self._resource_tags = defaultdict(lambda: defaultdict(dict))
self._commands = []
self._errors = []
@ -294,8 +297,8 @@ class SimpleSystemManagerBackend(BaseBackend):
self._validate_parameter_filters(parameter_filters, by_path=False)
result = []
for param in self._parameters:
ssm_parameter = self._parameters[param]
for param_name in self._parameters:
ssm_parameter = self.get_parameter(param_name, False)
if not self._match_filters(ssm_parameter, parameter_filters):
continue
@ -504,7 +507,7 @@ class SimpleSystemManagerBackend(BaseBackend):
result = []
for name in names:
if name in self._parameters:
result.append(self._parameters[name])
result.append(self.get_parameter(name, with_decryption))
return result
def get_parameters_by_path(self, path, with_decryption, recursive, filters=None):
@ -513,17 +516,24 @@ class SimpleSystemManagerBackend(BaseBackend):
# path could be with or without a trailing /. we handle this
# difference here.
path = path.rstrip("/") + "/"
for param in self._parameters:
if path != "/" and not param.startswith(path):
for param_name in self._parameters:
if path != "/" and not param_name.startswith(path):
continue
if "/" in param[len(path) + 1 :] and not recursive:
if "/" in param_name[len(path) + 1 :] and not recursive:
continue
if not self._match_filters(self._parameters[param], filters):
if not self._match_filters(
self.get_parameter(param_name, with_decryption), filters
):
continue
result.append(self._parameters[param])
result.append(self.get_parameter(param_name, with_decryption))
return result
def get_parameter_history(self, name, with_decryption):
if name in self._parameters:
return self._parameters[name]
return None
def _match_filters(self, parameter, filters=None):
"""Return True if the given parameter matches all the filters"""
for filter_obj in filters or []:
@ -579,31 +589,35 @@ class SimpleSystemManagerBackend(BaseBackend):
def get_parameter(self, name, with_decryption):
if name in self._parameters:
return self._parameters[name]
return self._parameters[name][-1]
return None
def put_parameter(
self, name, description, value, type, allowed_pattern, keyid, overwrite
):
previous_parameter = self._parameters.get(name)
version = 1
if previous_parameter:
previous_parameter_versions = self._parameters[name]
if len(previous_parameter_versions) == 0:
previous_parameter = None
version = 1
else:
previous_parameter = previous_parameter_versions[-1]
version = previous_parameter.version + 1
if not overwrite:
return
last_modified_date = time.time()
self._parameters[name] = Parameter(
name,
value,
type,
description,
allowed_pattern,
keyid,
last_modified_date,
version,
self._parameters[name].append(
Parameter(
name,
value,
type,
description,
allowed_pattern,
keyid,
last_modified_date,
version,
)
)
return version

View File

@ -139,6 +139,28 @@ class SimpleSystemManagerResponse(BaseResponse):
response = {"Version": result}
return json.dumps(response)
def get_parameter_history(self):
name = self._get_param("Name")
with_decryption = self._get_param("WithDecryption")
result = self.ssm_backend.get_parameter_history(name, with_decryption)
if result is None:
error = {
"__type": "ParameterNotFound",
"message": "Parameter {0} not found.".format(name),
}
return json.dumps(error), dict(status=400)
response = {"Parameters": []}
for parameter_version in result:
param_data = parameter_version.describe_response_object(
decrypt=with_decryption
)
response["Parameters"].append(param_data)
return json.dumps(response)
def add_tags_to_resource(self):
resource_id = self._get_param("ResourceId")
resource_type = self._get_param("ResourceType")

View File

@ -814,6 +814,85 @@ def test_put_parameter_secure_custom_kms():
response["Parameters"][0]["Type"].should.equal("SecureString")
@mock_ssm
def test_get_parameter_history():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
for i in range(3):
client.put_parameter(
Name=test_parameter_name,
Description="A test parameter version %d" % i,
Value="value-%d" % i,
Type="String",
Overwrite=True,
)
response = client.get_parameter_history(Name=test_parameter_name)
parameters_response = response["Parameters"]
for index, param in enumerate(parameters_response):
param["Name"].should.equal(test_parameter_name)
param["Type"].should.equal("String")
param["Value"].should.equal("value-%d" % index)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
len(parameters_response).should.equal(3)
@mock_ssm
def test_get_parameter_history_with_secure_string():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
for i in range(3):
client.put_parameter(
Name=test_parameter_name,
Description="A test parameter version %d" % i,
Value="value-%d" % i,
Type="SecureString",
Overwrite=True,
)
for with_decryption in [True, False]:
response = client.get_parameter_history(
Name=test_parameter_name, WithDecryption=with_decryption
)
parameters_response = response["Parameters"]
for index, param in enumerate(parameters_response):
param["Name"].should.equal(test_parameter_name)
param["Type"].should.equal("SecureString")
expected_plaintext_value = "value-%d" % index
if with_decryption:
param["Value"].should.equal(expected_plaintext_value)
else:
param["Value"].should.equal(
"kms:alias/aws/ssm:%s" % expected_plaintext_value
)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
len(parameters_response).should.equal(3)
@mock_ssm
def test_get_parameter_history_missing_parameter():
client = boto3.client("ssm", region_name="us-east-1")
try:
client.get_parameter_history(Name="test_noexist")
raise RuntimeError("Should have failed")
except botocore.exceptions.ClientError as err:
err.operation_name.should.equal("GetParameterHistory")
err.response["Error"]["Message"].should.equal(
"Parameter test_noexist not found."
)
@mock_ssm
def test_add_remove_list_tags_for_resource():
client = boto3.client("ssm", region_name="us-east-1")