Support --filters option in secretsmanager:ListSecrets (#3173)

* Feature: Support --filters opton in secretsmanager:ListSecrets

* Implement some of the secret filters

* Check listSecrets filters combine with an implicit AND operator

* Test all filter and multi-value filter and multi-word filter

* Fix matcher behavior, restructure code

* Implement remaining listSecrets filter cases

* Linter fixes

* Use contains-in-any-order assertions for test_list_secrets

* Linter fix again

* Attempt Python 2 fix for assert_items_equal

* Remove docstrings from test_list_secrets tests as they make the test reports weird

* Test and handle listSecrets filter with no values
This commit is contained in:
Chris Kilding 2020-07-31 15:31:18 +01:00 committed by GitHub
parent a9ac09952b
commit 943ecb7ea7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 377 additions and 57 deletions

View File

@ -57,3 +57,8 @@ class InvalidRequestException(SecretsManagerClientError):
super(InvalidRequestException, self).__init__(
"InvalidRequestException", message
)
class ValidationException(SecretsManagerClientError):
def __init__(self, message):
super(ValidationException, self).__init__("ValidationException", message)

View File

@ -0,0 +1,44 @@
def _matcher(pattern, str):
for word in pattern.split(" "):
if word not in str:
return False
return True
def name(secret, names):
for n in names:
if _matcher(n, secret["name"]):
return True
return False
def description(secret, descriptions):
for d in descriptions:
if _matcher(d, secret["description"]):
return True
return False
def tag_key(secret, tag_keys):
for k in tag_keys:
for tag in secret["tags"]:
if _matcher(k, tag["Key"]):
return True
return False
def tag_value(secret, tag_values):
for v in tag_values:
for tag in secret["tags"]:
if _matcher(v, tag["Value"]):
return True
return False
def all(secret, values):
return (
name(secret, values)
or description(secret, values)
or tag_key(secret, values)
or tag_value(secret, values)
)

View File

@ -18,6 +18,31 @@ from .exceptions import (
ClientError,
)
from .utils import random_password, secret_arn, get_secret_name_from_arn
from .list_secrets.filters import all, tag_key, tag_value, description, name
_filter_functions = {
"all": all,
"name": name,
"description": description,
"tag-key": tag_key,
"tag-value": tag_value,
}
def filter_keys():
return list(_filter_functions.keys())
def _matches(secret, filters):
is_match = True
for f in filters:
# Filter names are pre-validated in the resource layer
filter_function = _filter_functions.get(f["Key"])
is_match = is_match and filter_function(secret, f["Values"])
return is_match
class SecretsManager(BaseModel):
@ -442,35 +467,35 @@ class SecretsManagerBackend(BaseBackend):
return response
def list_secrets(self, max_results, next_token):
def list_secrets(self, filters, max_results, next_token):
# TODO implement pagination and limits
secret_list = []
for secret in self.secrets.values():
if _matches(secret, filters):
versions_to_stages = {}
for version_id, version in secret["versions"].items():
versions_to_stages[version_id] = version["version_stages"]
versions_to_stages = {}
for version_id, version in secret["versions"].items():
versions_to_stages[version_id] = version["version_stages"]
secret_list.append(
{
"ARN": secret_arn(self.region, secret["secret_id"]),
"DeletedDate": secret.get("deleted_date", None),
"Description": secret.get("description", ""),
"KmsKeyId": "",
"LastAccessedDate": None,
"LastChangedDate": None,
"LastRotatedDate": None,
"Name": secret["name"],
"RotationEnabled": secret["rotation_enabled"],
"RotationLambdaARN": secret["rotation_lambda_arn"],
"RotationRules": {
"AutomaticallyAfterDays": secret["auto_rotate_after_days"]
},
"SecretVersionsToStages": versions_to_stages,
"Tags": secret["tags"],
}
)
secret_list.append(
{
"ARN": secret_arn(self.region, secret["secret_id"]),
"DeletedDate": secret.get("deleted_date", None),
"Description": secret.get("description", ""),
"KmsKeyId": "",
"LastAccessedDate": None,
"LastChangedDate": None,
"LastRotatedDate": None,
"Name": secret["name"],
"RotationEnabled": secret["rotation_enabled"],
"RotationLambdaARN": secret["rotation_lambda_arn"],
"RotationRules": {
"AutomaticallyAfterDays": secret["auto_rotate_after_days"]
},
"SecretVersionsToStages": versions_to_stages,
"Tags": secret["tags"],
}
)
return secret_list, None

View File

@ -1,13 +1,36 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.secretsmanager.exceptions import InvalidRequestException
from moto.secretsmanager.exceptions import (
InvalidRequestException,
InvalidParameterException,
ValidationException,
)
from .models import secretsmanager_backends
from .models import secretsmanager_backends, filter_keys
import json
def _validate_filters(filters):
for idx, f in enumerate(filters):
filter_key = f.get("Key", None)
filter_values = f.get("Values", None)
if filter_key is None:
raise InvalidParameterException("Invalid filter key")
if filter_key not in filter_keys():
raise ValidationException(
"1 validation error detected: Value '{}' at 'filters.{}.member.key' failed to satisfy constraint: "
"Member must satisfy enum value set: [all, name, tag-key, description, tag-value]".format(
filter_key, idx + 1
)
)
if filter_values is None:
raise InvalidParameterException(
"Invalid filter values for key: {}".format(filter_key)
)
class SecretsManagerResponse(BaseResponse):
def get_secret_value(self):
secret_id = self._get_param("SecretId")
@ -102,10 +125,12 @@ class SecretsManagerResponse(BaseResponse):
)
def list_secrets(self):
filters = self._get_param("Filters", if_none=[])
_validate_filters(filters)
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
secret_list, next_token = secretsmanager_backends[self.region].list_secrets(
max_results=max_results, next_token=next_token
filters=filters, max_results=max_results, next_token=next_token
)
return json.dumps(dict(SecretList=secret_list, NextToken=next_token))

View File

@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import boto3
from moto import mock_secretsmanager
from botocore.exceptions import ClientError
import sure # noqa
from nose.tools import assert_raises
try:
from nose.tools import assert_items_equal
except ImportError:
from nose.tools import assert_count_equal as assert_items_equal
def boto_client():
return boto3.client("secretsmanager", region_name="us-west-2")
@mock_secretsmanager
def test_empty():
conn = boto_client()
secrets = conn.list_secrets()
assert_items_equal(secrets["SecretList"], [])
@mock_secretsmanager
def test_list_secrets():
conn = boto_client()
conn.create_secret(Name="test-secret", SecretString="foosecret")
conn.create_secret(
Name="test-secret-2",
SecretString="barsecret",
Tags=[{"Key": "a", "Value": "1"}],
)
secrets = conn.list_secrets()
assert secrets["SecretList"][0]["ARN"] is not None
assert secrets["SecretList"][0]["Name"] == "test-secret"
assert secrets["SecretList"][1]["ARN"] is not None
assert secrets["SecretList"][1]["Name"] == "test-secret-2"
assert secrets["SecretList"][1]["Tags"] == [{"Key": "a", "Value": "1"}]
@mock_secretsmanager
def test_with_name_filter():
conn = boto_client()
conn.create_secret(Name="foo", SecretString="secret")
conn.create_secret(Name="bar", SecretString="secret")
secrets = conn.list_secrets(Filters=[{"Key": "name", "Values": ["foo"]}])
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo"])
@mock_secretsmanager
def test_with_tag_key_filter():
conn = boto_client()
conn.create_secret(
Name="foo", SecretString="secret", Tags=[{"Key": "baz", "Value": "1"}]
)
conn.create_secret(Name="bar", SecretString="secret")
secrets = conn.list_secrets(Filters=[{"Key": "tag-key", "Values": ["baz"]}])
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo"])
@mock_secretsmanager
def test_with_tag_value_filter():
conn = boto_client()
conn.create_secret(
Name="foo", SecretString="secret", Tags=[{"Key": "1", "Value": "baz"}]
)
conn.create_secret(Name="bar", SecretString="secret")
secrets = conn.list_secrets(Filters=[{"Key": "tag-value", "Values": ["baz"]}])
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo"])
@mock_secretsmanager
def test_with_description_filter():
conn = boto_client()
conn.create_secret(Name="foo", SecretString="secret", Description="baz qux")
conn.create_secret(Name="bar", SecretString="secret")
secrets = conn.list_secrets(Filters=[{"Key": "description", "Values": ["baz"]}])
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo"])
@mock_secretsmanager
def test_with_all_filter():
# The 'all' filter will match a secret that contains ANY field with the criteria. In other words an implicit OR.
conn = boto_client()
conn.create_secret(Name="foo", SecretString="secret")
conn.create_secret(Name="bar", SecretString="secret", Description="foo")
conn.create_secret(
Name="baz", SecretString="secret", Tags=[{"Key": "foo", "Value": "1"}]
)
conn.create_secret(
Name="qux", SecretString="secret", Tags=[{"Key": "1", "Value": "foo"}]
)
conn.create_secret(
Name="multi", SecretString="secret", Tags=[{"Key": "foo", "Value": "foo"}]
)
conn.create_secret(Name="none", SecretString="secret")
secrets = conn.list_secrets(Filters=[{"Key": "all", "Values": ["foo"]}])
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo", "bar", "baz", "qux", "multi"])
@mock_secretsmanager
def test_with_no_filter_key():
conn = boto_client()
with assert_raises(ClientError) as ire:
conn.list_secrets(Filters=[{"Values": ["foo"]}])
ire.exception.response["Error"]["Code"].should.equal("InvalidParameterException")
ire.exception.response["Error"]["Message"].should.equal("Invalid filter key")
@mock_secretsmanager
def test_with_no_filter_values():
conn = boto_client()
conn.create_secret(Name="foo", SecretString="secret", Description="hello")
with assert_raises(ClientError) as ire:
conn.list_secrets(Filters=[{"Key": "description"}])
ire.exception.response["Error"]["Code"].should.equal("InvalidParameterException")
ire.exception.response["Error"]["Message"].should.equal(
"Invalid filter values for key: description"
)
@mock_secretsmanager
def test_with_invalid_filter_key():
conn = boto_client()
with assert_raises(ClientError) as ire:
conn.list_secrets(Filters=[{"Key": "invalid", "Values": ["foo"]}])
ire.exception.response["Error"]["Code"].should.equal("ValidationException")
ire.exception.response["Error"]["Message"].should.equal(
"1 validation error detected: Value 'invalid' at 'filters.1.member.key' failed to satisfy constraint: Member "
"must satisfy enum value set: [all, name, tag-key, description, tag-value]"
)
@mock_secretsmanager
def test_with_duplicate_filter_keys():
# Multiple filters with the same key combine with an implicit AND operator
conn = boto_client()
conn.create_secret(Name="foo", SecretString="secret", Description="one two")
conn.create_secret(Name="bar", SecretString="secret", Description="one")
conn.create_secret(Name="baz", SecretString="secret", Description="two")
conn.create_secret(Name="qux", SecretString="secret", Description="unrelated")
secrets = conn.list_secrets(
Filters=[
{"Key": "description", "Values": ["one"]},
{"Key": "description", "Values": ["two"]},
]
)
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo"])
@mock_secretsmanager
def test_with_multiple_filters():
# Multiple filters combine with an implicit AND operator
conn = boto_client()
conn.create_secret(
Name="foo", SecretString="secret", Tags=[{"Key": "right", "Value": "right"}]
)
conn.create_secret(
Name="bar", SecretString="secret", Tags=[{"Key": "right", "Value": "wrong"}]
)
conn.create_secret(
Name="baz", SecretString="secret", Tags=[{"Key": "wrong", "Value": "right"}]
)
conn.create_secret(
Name="qux", SecretString="secret", Tags=[{"Key": "wrong", "Value": "wrong"}]
)
secrets = conn.list_secrets(
Filters=[
{"Key": "tag-key", "Values": ["right"]},
{"Key": "tag-value", "Values": ["right"]},
]
)
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo"])
@mock_secretsmanager
def test_with_filter_with_multiple_values():
conn = boto_client()
conn.create_secret(Name="foo", SecretString="secret")
conn.create_secret(Name="bar", SecretString="secret")
conn.create_secret(Name="baz", SecretString="secret")
secrets = conn.list_secrets(Filters=[{"Key": "name", "Values": ["foo", "bar"]}])
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo", "bar"])
@mock_secretsmanager
def test_with_filter_with_value_with_multiple_words():
conn = boto_client()
conn.create_secret(Name="foo", SecretString="secret", Description="one two")
conn.create_secret(Name="bar", SecretString="secret", Description="one and two")
conn.create_secret(Name="baz", SecretString="secret", Description="one")
conn.create_secret(Name="qux", SecretString="secret", Description="two")
conn.create_secret(Name="none", SecretString="secret", Description="unrelated")
secrets = conn.list_secrets(Filters=[{"Key": "description", "Values": ["one two"]}])
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert_items_equal(secret_names, ["foo", "bar"])

View File

@ -459,36 +459,6 @@ def test_describe_secret_that_does_not_match():
result = conn.get_secret_value(SecretId="i-dont-match")
@mock_secretsmanager
def test_list_secrets_empty():
conn = boto3.client("secretsmanager", region_name="us-west-2")
secrets = conn.list_secrets()
assert secrets["SecretList"] == []
@mock_secretsmanager
def test_list_secrets():
conn = boto3.client("secretsmanager", region_name="us-west-2")
conn.create_secret(Name="test-secret", SecretString="foosecret")
conn.create_secret(
Name="test-secret-2",
SecretString="barsecret",
Tags=[{"Key": "a", "Value": "1"}],
)
secrets = conn.list_secrets()
assert secrets["SecretList"][0]["ARN"] is not None
assert secrets["SecretList"][0]["Name"] == "test-secret"
assert secrets["SecretList"][1]["ARN"] is not None
assert secrets["SecretList"][1]["Name"] == "test-secret-2"
assert secrets["SecretList"][1]["Tags"] == [{"Key": "a", "Value": "1"}]
@mock_secretsmanager
def test_restore_secret():
conn = boto3.client("secretsmanager", region_name="us-west-2")