Implement SSM Parameter Store filters support (GetParametersByPath API) (#1604)

* added tests for SSM Parameter Store filters (GetParametersByPath - ParameterStringFilter)

* implemented SSM Parameter Store filters support (only for get_parameters_by_path API)

* adding myself to authors file
This commit is contained in:
Alex Casalboni 2018-04-30 20:02:47 +02:00 committed by Jack Danger
parent ba2ea8e1b3
commit cb364eedc6
4 changed files with 117 additions and 2 deletions

View File

@ -52,3 +52,4 @@ Moto is written by Steve Pulec with contributions from:
* [Clive Li](https://github.com/cliveli)
* [Jim Shields](https://github.com/jimjshields)
* [William Richard](https://github.com/william-richard)
* [Alex Casalboni](https://github.com/alexcasalboni)

View File

@ -93,7 +93,7 @@ class SimpleSystemManagerBackend(BaseBackend):
result.append(self._parameters[name])
return result
def get_parameters_by_path(self, path, with_decryption, recursive):
def get_parameters_by_path(self, path, with_decryption, recursive, filters=None):
"""Implement the get-parameters-by-path-API in the backend."""
result = []
# path could be with or without a trailing /. we handle this
@ -104,10 +104,35 @@ class SimpleSystemManagerBackend(BaseBackend):
continue
if '/' in param[len(path) + 1:] and not recursive:
continue
if not self._match_filters(self._parameters[param], filters):
continue
result.append(self._parameters[param])
return result
@staticmethod
def _match_filters(parameter, filters=None):
"""Return True if the given parameter matches all the filters"""
for filter_obj in (filters or []):
key = filter_obj['Key']
option = filter_obj.get('Option', 'Equals')
values = filter_obj.get('Values', [])
what = None
if key == 'Type':
what = parameter.type
elif key == 'KeyId':
what = parameter.keyid
if option == 'Equals'\
and not any(what == value for value in values):
return False
elif option == 'BeginsWith'\
and not any(what.startswith(value) for value in values):
return False
# True if no false match (or no filters at all)
return True
def get_parameter(self, name, with_decryption):
if name in self._parameters:
return self._parameters[name]

View File

@ -85,9 +85,10 @@ class SimpleSystemManagerResponse(BaseResponse):
path = self._get_param('Path')
with_decryption = self._get_param('WithDecryption')
recursive = self._get_param('Recursive', False)
filters = self._get_param('ParameterFilters')
result = self.ssm_backend.get_parameters_by_path(
path, with_decryption, recursive
path, with_decryption, recursive, filters
)
response = {

View File

@ -76,6 +76,25 @@ def test_get_parameters_by_path():
Value='value4',
Type='String')
client.put_parameter(
Name='/baz/name1',
Description='A test parameter (list)',
Value='value1,value2,value3',
Type='StringList')
client.put_parameter(
Name='/baz/name2',
Description='A test parameter',
Value='value1',
Type='String')
client.put_parameter(
Name='/baz/pwd',
Description='A secure test parameter',
Value='my_secret',
Type='SecureString',
KeyId='alias/aws/ssm')
response = client.get_parameters_by_path(Path='/foo')
len(response['Parameters']).should.equal(2)
{p['Value'] for p in response['Parameters']}.should.equal(
@ -92,6 +111,75 @@ def test_get_parameters_by_path():
set(['value3', 'value4'])
)
response = client.get_parameters_by_path(Path='/baz')
len(response['Parameters']).should.equal(3)
filters = [{
'Key': 'Type',
'Option': 'Equals',
'Values': ['StringList'],
}]
response = client.get_parameters_by_path(Path='/baz', ParameterFilters=filters)
len(response['Parameters']).should.equal(1)
{p['Name'] for p in response['Parameters']}.should.equal(
set(['/baz/name1'])
)
# note: 'Option' is optional (default: 'Equals')
filters = [{
'Key': 'Type',
'Values': ['StringList'],
}]
response = client.get_parameters_by_path(Path='/baz', ParameterFilters=filters)
len(response['Parameters']).should.equal(1)
{p['Name'] for p in response['Parameters']}.should.equal(
set(['/baz/name1'])
)
filters = [{
'Key': 'Type',
'Option': 'Equals',
'Values': ['String'],
}]
response = client.get_parameters_by_path(Path='/baz', ParameterFilters=filters)
len(response['Parameters']).should.equal(1)
{p['Name'] for p in response['Parameters']}.should.equal(
set(['/baz/name2'])
)
filters = [{
'Key': 'Type',
'Option': 'Equals',
'Values': ['String', 'SecureString'],
}]
response = client.get_parameters_by_path(Path='/baz', ParameterFilters=filters)
len(response['Parameters']).should.equal(2)
{p['Name'] for p in response['Parameters']}.should.equal(
set(['/baz/name2', '/baz/pwd'])
)
filters = [{
'Key': 'Type',
'Option': 'BeginsWith',
'Values': ['String'],
}]
response = client.get_parameters_by_path(Path='/baz', ParameterFilters=filters)
len(response['Parameters']).should.equal(2)
{p['Name'] for p in response['Parameters']}.should.equal(
set(['/baz/name1', '/baz/name2'])
)
filters = [{
'Key': 'KeyId',
'Option': 'Equals',
'Values': ['alias/aws/ssm'],
}]
response = client.get_parameters_by_path(Path='/baz', ParameterFilters=filters)
len(response['Parameters']).should.equal(1)
{p['Name'] for p in response['Parameters']}.should.equal(
set(['/baz/pwd'])
)
@mock_ssm
def test_put_parameter():