SSM: Added support for label_parameter_version and getting labels on get_parameter_history

This commit is contained in:
Brent Driskill 2020-03-08 20:32:01 -04:00
parent 55a1500827
commit 5705695495
4 changed files with 396 additions and 3 deletions

View File

@ -22,6 +22,23 @@ class InvalidFilterValue(JsonRESTError):
def __init__(self, message):
super(InvalidFilterValue, self).__init__("InvalidFilterValue", message)
class ParameterNotFound(JsonRESTError):
code = 400
def __init__(self, message):
super(ParameterNotFound, self).__init__("ParameterNotFound", message)
class ParameterVersionNotFound(JsonRESTError):
code = 400
def __init__(self, message):
super(ParameterVersionNotFound, self).__init__("ParameterVersionNotFound", message)
class ParameterVersionLabelLimitExceeded(JsonRESTError):
code = 400
def __init__(self, message):
super(ParameterVersionLabelLimitExceeded, self).__init__("ParameterVersionLabelLimitExceeded", message)
class ValidationException(JsonRESTError):
code = 400

View File

@ -19,6 +19,9 @@ from .exceptions import (
InvalidFilterValue,
InvalidFilterOption,
InvalidFilterKey,
ParameterVersionLabelLimitExceeded,
ParameterVersionNotFound,
ParameterNotFound
)
@ -32,7 +35,7 @@ class Parameter(BaseModel):
allowed_pattern,
keyid,
last_modified_date,
version,
version
):
self.name = name
self.type = type
@ -41,6 +44,7 @@ class Parameter(BaseModel):
self.keyid = keyid
self.last_modified_date = last_modified_date
self.version = version
self.labels = []
if self.type == "SecureString":
if not self.keyid:
@ -75,7 +79,7 @@ class Parameter(BaseModel):
return r
def describe_response_object(self, decrypt=False):
def describe_response_object(self, decrypt=False, include_labels=False):
r = self.response_object(decrypt)
r["LastModifiedDate"] = round(self.last_modified_date, 3)
r["LastModifiedUser"] = "N/A"
@ -89,6 +93,9 @@ class Parameter(BaseModel):
if self.allowed_pattern:
r["AllowedPattern"] = self.allowed_pattern
if include_labels:
r["Labels"] = self.labels
return r
@ -614,6 +621,61 @@ class SimpleSystemManagerBackend(BaseBackend):
return self._parameters[name][-1]
return None
def label_parameter_version(self, name, version, labels):
previous_parameter_versions = self._parameters[name]
if not previous_parameter_versions:
raise ParameterNotFound(
"Parameter %s not found." % name
)
found_parameter = None
labels_needing_removal = []
if not version:
version = 1
for parameter in previous_parameter_versions:
if parameter.version >= version:
version = parameter.version
for parameter in previous_parameter_versions:
if parameter.version == version:
found_parameter = parameter
else:
for label in labels:
if label in parameter.labels:
labels_needing_removal.append(label)
if not found_parameter:
raise ParameterVersionNotFound(
"Systems Manager could not find version %s of %s. "
"Verify the version and try again." % (version, name)
)
labels_to_append = []
invalid_labels = []
for label in labels:
if label.startswith("aws") or label.startswith("ssm") or label[:1].isdigit() or not re.match("^[a-zA-z0-9_\.\-]*$", label):
invalid_labels.append(label)
continue
if len(label) > 100:
raise ValidationException(
"1 validation error detected: "
"Value '[%s]' at 'labels' failed to satisfy constraint: "
"Member must satisfy constraint: "
"[Member must have length less than or equal to 100, Member must have length greater than or equal to 1]" % label
)
continue
if label not in found_parameter.labels:
labels_to_append.append(label)
if (len(found_parameter.labels) + len(labels_to_append)) > 10:
raise ParameterVersionLabelLimitExceeded(
"An error occurred (ParameterVersionLabelLimitExceeded) when calling the LabelParameterVersion operation: "
"A parameter version can have maximum 10 labels."
"Move one or more labels to another version and try again."
)
found_parameter.labels = found_parameter.labels + labels_to_append
for parameter in previous_parameter_versions:
if parameter.version != version:
for label in parameter.labels[:]:
if label in labels_needing_removal:
parameter.labels.remove(label)
return [invalid_labels, version]
def put_parameter(
self, name, description, value, type, allowed_pattern, keyid, overwrite
):

View File

@ -168,12 +168,23 @@ class SimpleSystemManagerResponse(BaseResponse):
response = {"Parameters": []}
for parameter_version in result:
param_data = parameter_version.describe_response_object(
decrypt=with_decryption
decrypt=with_decryption,
include_labels=True
)
response["Parameters"].append(param_data)
return json.dumps(response)
def label_parameter_version(self):
name = self._get_param("Name")
version = self._get_param("ParameterVersion")
labels = self._get_param("Labels")
invalid_labels, version = self.ssm_backend.label_parameter_version(name, version, labels)
response = {"InvalidLabels": invalid_labels, "ParameterVersion": version}
return json.dumps(response)
def add_tags_to_resource(self):
resource_id = self._get_param("ResourceId")
resource_type = self._get_param("ResourceType")

View File

@ -897,6 +897,7 @@ def test_get_parameter_history():
param["Value"].should.equal("value-%d" % index)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
param["Labels"].should.equal([])
len(parameters_response).should.equal(3)
@ -937,6 +938,308 @@ def test_get_parameter_history_with_secure_string():
len(parameters_response).should.equal(3)
@mock_ssm
def test_label_parameter_version():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
client.put_parameter(Name=test_parameter_name, Description="A test parameter", Value="value", Type="String")
response = client.label_parameter_version(Name=test_parameter_name, Labels=["test-label"])
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(1)
@mock_ssm
def test_label_parameter_version_with_specific_version():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
client.put_parameter(Name=test_parameter_name, Description="A test parameter", Value="value", Type="String")
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=["test-label"])
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(1)
@mock_ssm
def test_label_parameter_version_twice():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
test_labels = ["test-label"]
client.put_parameter(Name=test_parameter_name, Description="A test parameter", Value="value", Type="String")
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=test_labels)
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(1)
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=test_labels)
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(1)
response = client.get_parameter_history(Name=test_parameter_name)
len(response["Parameters"]).should.equal(1)
response["Parameters"][0]["Labels"].should.equal(test_labels)
@mock_ssm
def test_label_parameter_moving_versions():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
test_labels = ["test-label"]
for i in range(3):
client.put_parameter(
Name=test_parameter_name,
Description="A test parameter version %d" % i,
Value="value-%d" % i,
Type="String",
Overwrite=True
)
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=test_labels)
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(1)
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=2, Labels=test_labels)
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(2)
response = client.get_parameter_history(Name=test_parameter_name)
parameters_response = response["Parameters"]
for index, param in enumerate(parameters_response):
param["Name"].should.equal(test_parameter_name)
param["Type"].should.equal("String")
param["Value"].should.equal("value-%d" % index)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
labels = test_labels if param["Version"] == 2 else []
param["Labels"].should.equal(labels)
len(parameters_response).should.equal(3)
@mock_ssm
def test_label_parameter_moving_versions_complex():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
for i in range(3):
client.put_parameter(
Name=test_parameter_name,
Description="A test parameter version %d" % i,
Value="value-%d" % i,
Type="String",
Overwrite=True
)
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=["test-label1", "test-label2", "test-label3"])
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(1)
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=2, Labels=["test-label2", "test-label3"])
response["InvalidLabels"].should.equal([])
response["ParameterVersion"].should.equal(2)
response = client.get_parameter_history(Name=test_parameter_name)
parameters_response = response["Parameters"]
for index, param in enumerate(parameters_response):
param["Name"].should.equal(test_parameter_name)
param["Type"].should.equal("String")
param["Value"].should.equal("value-%d" % index)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
labels = ["test-label2", "test-label3"] if param["Version"] == 2 else (["test-label1"] if param["Version"] == 1 else [])
param["Labels"].should.equal(labels)
len(parameters_response).should.equal(3)
@mock_ssm
def test_label_parameter_version_exception_ten_labels_at_once():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
test_labels = ["test-label1", "test-label2", "test-label3", "test-label4", "test-label5", "test-label6", "test-label7", "test-label8", "test-label9", "test-label10", "test-label11"]
client.put_parameter(Name=test_parameter_name, Description="A test parameter", Value="value", Type="String")
client.label_parameter_version.when.called_with(
Name="test", ParameterVersion=1, Labels=test_labels
).should.throw(
ClientError,
"An error occurred (ParameterVersionLabelLimitExceeded) when calling the LabelParameterVersion operation: "
"A parameter version can have maximum 10 labels."
"Move one or more labels to another version and try again."
)
@mock_ssm
def test_label_parameter_version_exception_ten_labels_over_multiple_calls():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
client.put_parameter(Name=test_parameter_name, Description="A test parameter", Value="value", Type="String")
client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=["test-label1", "test-label2", "test-label3", "test-label4", "test-label5"])
client.label_parameter_version.when.called_with(
Name="test", ParameterVersion=1, Labels=["test-label6", "test-label7", "test-label8", "test-label9", "test-label10", "test-label11"]
).should.throw(
ClientError,
"An error occurred (ParameterVersionLabelLimitExceeded) when calling the LabelParameterVersion operation: "
"A parameter version can have maximum 10 labels."
"Move one or more labels to another version and try again."
)
@mock_ssm
def test_label_parameter_version_invalid_name():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
response = client.label_parameter_version.when.called_with(
Name=test_parameter_name, Labels=["test-label"]
).should.throw(
ClientError,
"An error occurred (ParameterNotFound) when calling the LabelParameterVersion operation: "
"Parameter test not found."
)
@mock_ssm
def test_label_parameter_version_invalid_parameter_version():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
client.put_parameter(Name=test_parameter_name, Description="A test parameter", Value="value", Type="String")
response = client.label_parameter_version.when.called_with(
Name=test_parameter_name, Labels=["test-label"], ParameterVersion=5
).should.throw(
ClientError,
"An error occurred (ParameterVersionNotFound) when calling the LabelParameterVersion operation: "
"Systems Manager could not find version 5 of test. "
"Verify the version and try again."
)
@mock_ssm
def test_label_parameter_version_invalid_label():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
client.put_parameter(Name=test_parameter_name, Description="A test parameter", Value="value", Type="String")
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=["awsabc"])
response["InvalidLabels"].should.equal(["awsabc"])
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=["ssmabc"])
response["InvalidLabels"].should.equal(["ssmabc"])
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=["9abc"])
response["InvalidLabels"].should.equal(["9abc"])
response = client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=["abc/123"])
response["InvalidLabels"].should.equal(["abc/123"])
client.label_parameter_version.when.called_with(
Name=test_parameter_name, ParameterVersion=1, Labels=["a"*101]
).should.throw(
ClientError,
"1 validation error detected: "
"Value '[%s]' at 'labels' failed to satisfy constraint: "
"Member must satisfy constraint: "
"[Member must have length less than or equal to 100, Member must have length greater than or equal to 1]" % ("a"*101)
)
@mock_ssm
def test_get_parameter_history_with_label():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
test_labels = ["test-label"]
for i in range(3):
client.put_parameter(
Name=test_parameter_name,
Description="A test parameter version %d" % i,
Value="value-%d" % i,
Type="String",
Overwrite=True,
)
client.label_parameter_version(Name=test_parameter_name, ParameterVersion=1, Labels=test_labels)
response = client.get_parameter_history(Name=test_parameter_name)
parameters_response = response["Parameters"]
for index, param in enumerate(parameters_response):
param["Name"].should.equal(test_parameter_name)
param["Type"].should.equal("String")
param["Value"].should.equal("value-%d" % index)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
labels = test_labels if param["Version"] == 1 else []
param["Labels"].should.equal(labels)
len(parameters_response).should.equal(3)
@mock_ssm
def test_get_parameter_history_with_label_non_latest():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
test_labels = ["test-label"]
for i in range(3):
client.put_parameter(
Name=test_parameter_name,
Description="A test parameter version %d" % i,
Value="value-%d" % i,
Type="String",
Overwrite=True,
)
client.label_parameter_version(Name=test_parameter_name, ParameterVersion=2, Labels=test_labels)
response = client.get_parameter_history(Name=test_parameter_name)
parameters_response = response["Parameters"]
for index, param in enumerate(parameters_response):
param["Name"].should.equal(test_parameter_name)
param["Type"].should.equal("String")
param["Value"].should.equal("value-%d" % index)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
labels = test_labels if param["Version"] == 2 else []
param["Labels"].should.equal(labels)
len(parameters_response).should.equal(3)
@mock_ssm
def test_get_parameter_history_with_label_latest_assumed():
client = boto3.client("ssm", region_name="us-east-1")
test_parameter_name = "test"
test_labels = ["test-label"]
for i in range(3):
client.put_parameter(
Name=test_parameter_name,
Description="A test parameter version %d" % i,
Value="value-%d" % i,
Type="String",
Overwrite=True,
)
client.label_parameter_version(Name=test_parameter_name, Labels=test_labels)
response = client.get_parameter_history(Name=test_parameter_name)
parameters_response = response["Parameters"]
for index, param in enumerate(parameters_response):
param["Name"].should.equal(test_parameter_name)
param["Type"].should.equal("String")
param["Value"].should.equal("value-%d" % index)
param["Version"].should.equal(index + 1)
param["Description"].should.equal("A test parameter version %d" % index)
labels = test_labels if param["Version"] == 3 else []
param["Labels"].should.equal(labels)
len(parameters_response).should.equal(3)
@mock_ssm
def test_get_parameter_history_missing_parameter():