Feature: Secrets Manager put_secret_value and list_secret_version_ids (#2116)

* initial work

- adding put_secret_value and list_secret_versions

* Added support for versions in all functions except rotate_secret

* more work

- refactor rotate_secret method
  - now, adds a new version of the secret and points default version id to it
- updated implementation coverage readme
- element in list check to fix unit test
- fixed linting errors
- added tests, fixed exception, failing tests still
- secrets_manager/test_server fails when running whole suite,
  but not when running that individual test file

* fixed failing test_get_secret_value

* Removed test.py. Fixed condition statement.

* fixed default stages + adding AWSPREVIOUS

* remove old AWSPREVIOUS stages
This commit is contained in:
Justin Kieber-King 2019-05-22 05:45:22 -04:00 committed by Terry Cain
parent 8f4c273095
commit bbd7fefb37
6 changed files with 427 additions and 84 deletions

View File

@ -3658,9 +3658,9 @@
- [X] describe_secret - [X] describe_secret
- [X] get_random_password - [X] get_random_password
- [X] get_secret_value - [X] get_secret_value
- [ ] list_secret_version_ids - [X] list_secret_version_ids
- [x] list_secrets - [X] list_secrets
- [ ] put_secret_value - [X] put_secret_value
- [X] restore_secret - [X] restore_secret
- [X] rotate_secret - [X] rotate_secret
- [ ] tag_resource - [ ] tag_resource

View File

@ -29,6 +29,14 @@ class InvalidParameterException(SecretsManagerClientError):
message) message)
class ResourceExistsException(SecretsManagerClientError):
def __init__(self, message):
super(ResourceExistsException, self).__init__(
'ResourceExistsException',
message
)
class InvalidRequestException(SecretsManagerClientError): class InvalidRequestException(SecretsManagerClientError):
def __init__(self, message): def __init__(self, message):
super(InvalidRequestException, self).__init__( super(InvalidRequestException, self).__init__(

View File

@ -11,6 +11,7 @@ from moto.core import BaseBackend, BaseModel
from .exceptions import ( from .exceptions import (
ResourceNotFoundException, ResourceNotFoundException,
InvalidParameterException, InvalidParameterException,
ResourceExistsException,
InvalidRequestException, InvalidRequestException,
ClientError ClientError
) )
@ -47,6 +48,17 @@ class SecretsManagerBackend(BaseBackend):
if not self._is_valid_identifier(secret_id): if not self._is_valid_identifier(secret_id):
raise ResourceNotFoundException() raise ResourceNotFoundException()
if not version_id and version_stage:
# set version_id to match version_stage
versions_dict = self.secrets[secret_id]['versions']
for ver_id, ver_val in versions_dict.items():
if version_stage in ver_val['version_stages']:
version_id = ver_id
break
if not version_id:
raise ResourceNotFoundException()
# TODO check this part
if 'deleted_date' in self.secrets[secret_id]: if 'deleted_date' in self.secrets[secret_id]:
raise InvalidRequestException( raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \ "An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \
@ -54,42 +66,91 @@ class SecretsManagerBackend(BaseBackend):
) )
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
version_id = version_id or secret['default_version_id']
secret_version = secret['versions'][version_id]
response = json.dumps({ response = json.dumps({
"ARN": secret_arn(self.region, secret['secret_id']), "ARN": secret_arn(self.region, secret['secret_id']),
"Name": secret['name'], "Name": secret['name'],
"VersionId": secret['version_id'], "VersionId": secret_version['version_id'],
"SecretString": secret['secret_string'], "SecretString": secret_version['secret_string'],
"VersionStages": [ "VersionStages": secret_version['version_stages'],
"AWSCURRENT", "CreatedDate": secret_version['createdate'],
],
"CreatedDate": secret['createdate']
}) })
return response return response
def create_secret(self, name, secret_string, tags, **kwargs): def create_secret(self, name, secret_string, tags, **kwargs):
generated_version_id = str(uuid.uuid4()) # error if secret exists
if name in self.secrets.keys():
raise ResourceExistsException('A resource with the ID you requested already exists.')
secret = { version_id = self._add_secret(name, secret_string, tags=tags)
'secret_string': secret_string,
'secret_id': name,
'name': name,
'createdate': int(time.time()),
'rotation_enabled': False,
'rotation_lambda_arn': '',
'auto_rotate_after_days': 0,
'version_id': generated_version_id,
'tags': tags
}
self.secrets[name] = secret
response = json.dumps({ response = json.dumps({
"ARN": secret_arn(self.region, name), "ARN": secret_arn(self.region, name),
"Name": name, "Name": name,
"VersionId": generated_version_id, "VersionId": version_id,
})
return response
def _add_secret(self, secret_id, secret_string, tags=[], version_id=None, version_stages=None):
if version_stages is None:
version_stages = ['AWSCURRENT']
if not version_id:
version_id = str(uuid.uuid4())
secret_version = {
'secret_string': secret_string,
'createdate': int(time.time()),
'version_id': version_id,
'version_stages': version_stages,
}
if secret_id in self.secrets:
# remove all old AWSPREVIOUS stages
for secret_verion_to_look_at in self.secrets[secret_id]['versions'].values():
if 'AWSPREVIOUS' in secret_verion_to_look_at['version_stages']:
secret_verion_to_look_at['version_stages'].remove('AWSPREVIOUS')
# set old AWSCURRENT secret to AWSPREVIOUS
previous_current_version_id = self.secrets[secret_id]['default_version_id']
self.secrets[secret_id]['versions'][previous_current_version_id]['version_stages'] = ['AWSPREVIOUS']
self.secrets[secret_id]['versions'][version_id] = secret_version
self.secrets[secret_id]['default_version_id'] = version_id
else:
self.secrets[secret_id] = {
'versions': {
version_id: secret_version
},
'default_version_id': version_id,
}
secret = self.secrets[secret_id]
secret['secret_id'] = secret_id
secret['name'] = secret_id
secret['rotation_enabled'] = False
secret['rotation_lambda_arn'] = ''
secret['auto_rotate_after_days'] = 0
secret['tags'] = tags
return version_id
def put_secret_value(self, secret_id, secret_string, version_stages):
version_id = self._add_secret(secret_id, secret_string, version_stages=version_stages)
response = json.dumps({
'ARN': secret_arn(self.region, secret_id),
'Name': secret_id,
'VersionId': version_id,
'VersionStages': version_stages
}) })
return response return response
@ -162,17 +223,24 @@ class SecretsManagerBackend(BaseBackend):
secret = self.secrets[secret_id] secret = self.secrets[secret_id]
secret['version_id'] = client_request_token or '' old_secret_version = secret['versions'][secret['default_version_id']]
new_version_id = client_request_token or str(uuid.uuid4())
self._add_secret(secret_id, old_secret_version['secret_string'], secret['tags'], version_id=new_version_id, version_stages=['AWSCURRENT'])
secret['rotation_lambda_arn'] = rotation_lambda_arn or '' secret['rotation_lambda_arn'] = rotation_lambda_arn or ''
if rotation_rules: if rotation_rules:
secret['auto_rotate_after_days'] = rotation_rules.get(rotation_days, 0) secret['auto_rotate_after_days'] = rotation_rules.get(rotation_days, 0)
if secret['auto_rotate_after_days'] > 0: if secret['auto_rotate_after_days'] > 0:
secret['rotation_enabled'] = True secret['rotation_enabled'] = True
if 'AWSCURRENT' in old_secret_version['version_stages']:
old_secret_version['version_stages'].remove('AWSCURRENT')
response = json.dumps({ response = json.dumps({
"ARN": secret_arn(self.region, secret['secret_id']), "ARN": secret_arn(self.region, secret['secret_id']),
"Name": secret['name'], "Name": secret['name'],
"VersionId": secret['version_id'] "VersionId": new_version_id
}) })
return response return response
@ -206,28 +274,54 @@ class SecretsManagerBackend(BaseBackend):
return response return response
def list_secret_version_ids(self, secret_id):
secret = self.secrets[secret_id]
version_list = []
for version_id, version in secret['versions'].items():
version_list.append({
'CreatedDate': int(time.time()),
'LastAccessedDate': int(time.time()),
'VersionId': version_id,
'VersionStages': version['version_stages'],
})
response = json.dumps({
'ARN': secret['secret_id'],
'Name': secret['name'],
'NextToken': '',
'Versions': version_list,
})
return response
def list_secrets(self, max_results, next_token): def list_secrets(self, max_results, next_token):
# TODO implement pagination and limits # TODO implement pagination and limits
secret_list = [{ secret_list = []
"ARN": secret_arn(self.region, secret['secret_id']), for secret in self.secrets.values():
"DeletedDate": secret.get('deleted_date', None),
"Description": "", versions_to_stages = {}
"KmsKeyId": "", for version_id, version in secret['versions'].items():
"LastAccessedDate": None, versions_to_stages[version_id] = version['version_stages']
"LastChangedDate": None,
"LastRotatedDate": None, secret_list.append({
"Name": secret['name'], "ARN": secret_arn(self.region, secret['secret_id']),
"RotationEnabled": secret['rotation_enabled'], "DeletedDate": secret.get('deleted_date', None),
"RotationLambdaARN": secret['rotation_lambda_arn'], "Description": "",
"RotationRules": { "KmsKeyId": "",
"AutomaticallyAfterDays": secret['auto_rotate_after_days'] "LastAccessedDate": None,
}, "LastChangedDate": None,
"SecretVersionsToStages": { "LastRotatedDate": None,
secret['version_id']: ["AWSCURRENT"] "Name": secret['name'],
}, "RotationEnabled": secret['rotation_enabled'],
"Tags": secret['tags'] "RotationLambdaARN": secret['rotation_lambda_arn'],
} for secret in self.secrets.values()] "RotationRules": {
"AutomaticallyAfterDays": secret['auto_rotate_after_days']
},
"SecretVersionsToStages": versions_to_stages,
"Tags": secret['tags']
})
return secret_list, None return secret_list, None

View File

@ -67,6 +67,22 @@ class SecretsManagerResponse(BaseResponse):
rotation_rules=rotation_rules rotation_rules=rotation_rules
) )
def put_secret_value(self):
secret_id = self._get_param('SecretId', if_none='')
secret_string = self._get_param('SecretString', if_none='')
version_stages = self._get_param('VersionStages', if_none=['AWSCURRENT'])
return secretsmanager_backends[self.region].put_secret_value(
secret_id=secret_id,
secret_string=secret_string,
version_stages=version_stages,
)
def list_secret_version_ids(self):
secret_id = self._get_param('SecretId', if_none='')
return secretsmanager_backends[self.region].list_secret_version_ids(
secret_id=secret_id
)
def list_secrets(self): def list_secrets(self):
max_results = self._get_int_param("MaxResults") max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken") next_token = self._get_param("NextToken")

View File

@ -4,13 +4,15 @@ import boto3
from moto import mock_secretsmanager from moto import mock_secretsmanager
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import sure # noqa
import string import string
import unittest
import pytz import pytz
from datetime import datetime from datetime import datetime
import unittest
from nose.tools import assert_raises from nose.tools import assert_raises
DEFAULT_SECRET_NAME = 'test-secret'
@mock_secretsmanager @mock_secretsmanager
def test_get_secret_value(): def test_get_secret_value():
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
@ -389,34 +391,32 @@ def test_restore_secret_that_does_not_exist():
@mock_secretsmanager @mock_secretsmanager
def test_rotate_secret(): def test_rotate_secret():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name, conn.create_secret(Name=DEFAULT_SECRET_NAME,
SecretString='foosecret') SecretString='foosecret')
rotated_secret = conn.rotate_secret(SecretId=secret_name) rotated_secret = conn.rotate_secret(SecretId=DEFAULT_SECRET_NAME)
assert rotated_secret assert rotated_secret
assert rotated_secret['ARN'] != '' # Test arn not empty assert rotated_secret['ARN'] != '' # Test arn not empty
assert rotated_secret['Name'] == secret_name assert rotated_secret['Name'] == DEFAULT_SECRET_NAME
assert rotated_secret['VersionId'] != '' assert rotated_secret['VersionId'] != ''
@mock_secretsmanager @mock_secretsmanager
def test_rotate_secret_enable_rotation(): def test_rotate_secret_enable_rotation():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name, conn.create_secret(Name=DEFAULT_SECRET_NAME,
SecretString='foosecret') SecretString='foosecret')
initial_description = conn.describe_secret(SecretId=secret_name) initial_description = conn.describe_secret(SecretId=DEFAULT_SECRET_NAME)
assert initial_description assert initial_description
assert initial_description['RotationEnabled'] is False assert initial_description['RotationEnabled'] is False
assert initial_description['RotationRules']['AutomaticallyAfterDays'] == 0 assert initial_description['RotationRules']['AutomaticallyAfterDays'] == 0
conn.rotate_secret(SecretId=secret_name, conn.rotate_secret(SecretId=DEFAULT_SECRET_NAME,
RotationRules={'AutomaticallyAfterDays': 42}) RotationRules={'AutomaticallyAfterDays': 42})
rotated_description = conn.describe_secret(SecretId=secret_name) rotated_description = conn.describe_secret(SecretId=DEFAULT_SECRET_NAME)
assert rotated_description assert rotated_description
assert rotated_description['RotationEnabled'] is True assert rotated_description['RotationEnabled'] is True
assert rotated_description['RotationRules']['AutomaticallyAfterDays'] == 42 assert rotated_description['RotationRules']['AutomaticallyAfterDays'] == 42
@ -460,9 +460,8 @@ def test_rotate_secret_client_request_token_too_short():
@mock_secretsmanager @mock_secretsmanager
def test_rotate_secret_client_request_token_too_long(): def test_rotate_secret_client_request_token_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name, conn.create_secret(Name=DEFAULT_SECRET_NAME,
SecretString='foosecret') SecretString='foosecret')
client_request_token = ( client_request_token = (
@ -470,19 +469,18 @@ def test_rotate_secret_client_request_token_too_long():
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C' 'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C'
) )
with assert_raises(ClientError): with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name, result = conn.rotate_secret(SecretId=DEFAULT_SECRET_NAME,
ClientRequestToken=client_request_token) ClientRequestToken=client_request_token)
@mock_secretsmanager @mock_secretsmanager
def test_rotate_secret_rotation_lambda_arn_too_long(): def test_rotate_secret_rotation_lambda_arn_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name, conn.create_secret(Name=DEFAULT_SECRET_NAME,
SecretString='foosecret') SecretString='foosecret')
rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters
with assert_raises(ClientError): with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name, result = conn.rotate_secret(SecretId=DEFAULT_SECRET_NAME,
RotationLambdaARN=rotation_lambda_arn) RotationLambdaARN=rotation_lambda_arn)
@mock_secretsmanager @mock_secretsmanager
@ -494,12 +492,78 @@ def test_rotate_secret_rotation_period_zero():
@mock_secretsmanager @mock_secretsmanager
def test_rotate_secret_rotation_period_too_long(): def test_rotate_secret_rotation_period_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name, conn.create_secret(Name=DEFAULT_SECRET_NAME,
SecretString='foosecret') SecretString='foosecret')
rotation_rules = {'AutomaticallyAfterDays': 1001} rotation_rules = {'AutomaticallyAfterDays': 1001}
with assert_raises(ClientError): with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name, result = conn.rotate_secret(SecretId=DEFAULT_SECRET_NAME,
RotationRules=rotation_rules) RotationRules=rotation_rules)
@mock_secretsmanager
def test_put_secret_value_puts_new_secret():
conn = boto3.client('secretsmanager', region_name='us-west-2')
put_secret_value_dict = conn.put_secret_value(SecretId=DEFAULT_SECRET_NAME,
SecretString='foosecret',
VersionStages=['AWSCURRENT'])
version_id = put_secret_value_dict['VersionId']
get_secret_value_dict = conn.get_secret_value(SecretId=DEFAULT_SECRET_NAME,
VersionId=version_id,
VersionStage='AWSCURRENT')
assert get_secret_value_dict
assert get_secret_value_dict['SecretString'] == 'foosecret'
@mock_secretsmanager
def test_put_secret_value_can_get_first_version_if_put_twice():
conn = boto3.client('secretsmanager', region_name='us-west-2')
put_secret_value_dict = conn.put_secret_value(SecretId=DEFAULT_SECRET_NAME,
SecretString='first_secret',
VersionStages=['AWSCURRENT'])
first_version_id = put_secret_value_dict['VersionId']
conn.put_secret_value(SecretId=DEFAULT_SECRET_NAME,
SecretString='second_secret',
VersionStages=['AWSCURRENT'])
first_secret_value_dict = conn.get_secret_value(SecretId=DEFAULT_SECRET_NAME,
VersionId=first_version_id)
first_secret_value = first_secret_value_dict['SecretString']
assert first_secret_value == 'first_secret'
@mock_secretsmanager
def test_put_secret_value_versions_differ_if_same_secret_put_twice():
conn = boto3.client('secretsmanager', region_name='us-west-2')
put_secret_value_dict = conn.put_secret_value(SecretId=DEFAULT_SECRET_NAME,
SecretString='dupe_secret',
VersionStages=['AWSCURRENT'])
first_version_id = put_secret_value_dict['VersionId']
put_secret_value_dict = conn.put_secret_value(SecretId=DEFAULT_SECRET_NAME,
SecretString='dupe_secret',
VersionStages=['AWSCURRENT'])
second_version_id = put_secret_value_dict['VersionId']
assert first_version_id != second_version_id
@mock_secretsmanager
def test_can_list_secret_version_ids():
conn = boto3.client('secretsmanager', region_name='us-west-2')
put_secret_value_dict = conn.put_secret_value(SecretId=DEFAULT_SECRET_NAME,
SecretString='dupe_secret',
VersionStages=['AWSCURRENT'])
first_version_id = put_secret_value_dict['VersionId']
put_secret_value_dict = conn.put_secret_value(SecretId=DEFAULT_SECRET_NAME,
SecretString='dupe_secret',
VersionStages=['AWSCURRENT'])
second_version_id = put_secret_value_dict['VersionId']
versions_list = conn.list_secret_version_ids(SecretId=DEFAULT_SECRET_NAME)
returned_version_ids = [v['VersionId'] for v in versions_list['Versions']]
assert [first_version_id, second_version_id].sort() == returned_version_ids.sort()

View File

@ -10,6 +10,8 @@ from moto import mock_secretsmanager
Test the different server responses for secretsmanager Test the different server responses for secretsmanager
''' '''
DEFAULT_SECRET_NAME = 'test-secret'
@mock_secretsmanager @mock_secretsmanager
def test_get_secret_value(): def test_get_secret_value():
@ -18,19 +20,20 @@ def test_get_secret_value():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foo-secret"}, "SecretString": "foo-secret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret"}, "X-Amz-Target": "secretsmanager.CreateSecret"},
) )
get_secret = test_client.post('/', get_secret = test_client.post('/',
data={"SecretId": "test-secret", data={"SecretId": DEFAULT_SECRET_NAME,
"VersionStage": "AWSCURRENT"}, "VersionStage": "AWSCURRENT"},
headers={ headers={
"X-Amz-Target": "secretsmanager.GetSecretValue"}, "X-Amz-Target": "secretsmanager.GetSecretValue"},
) )
json_data = json.loads(get_secret.data.decode("utf-8")) json_data = json.loads(get_secret.data.decode("utf-8"))
assert json_data['SecretString'] == 'foo-secret' assert json_data['SecretString'] == 'foo-secret'
@mock_secretsmanager @mock_secretsmanager
@ -55,7 +58,7 @@ def test_get_secret_that_does_not_match():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foo-secret"}, "SecretString": "foo-secret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret"}, "X-Amz-Target": "secretsmanager.CreateSecret"},
@ -165,7 +168,7 @@ def test_describe_secret_that_does_not_match():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foosecret"}, "SecretString": "foosecret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret" "X-Amz-Target": "secretsmanager.CreateSecret"
@ -188,7 +191,7 @@ def test_rotate_secret():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foosecret"}, "SecretString": "foosecret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret" "X-Amz-Target": "secretsmanager.CreateSecret"
@ -197,7 +200,7 @@ def test_rotate_secret():
client_request_token = "EXAMPLE2-90ab-cdef-fedc-ba987SECRET2" client_request_token = "EXAMPLE2-90ab-cdef-fedc-ba987SECRET2"
rotate_secret = test_client.post('/', rotate_secret = test_client.post('/',
data={"SecretId": "test-secret", data={"SecretId": DEFAULT_SECRET_NAME,
"ClientRequestToken": client_request_token}, "ClientRequestToken": client_request_token},
headers={ headers={
"X-Amz-Target": "secretsmanager.RotateSecret" "X-Amz-Target": "secretsmanager.RotateSecret"
@ -207,7 +210,7 @@ def test_rotate_secret():
json_data = json.loads(rotate_secret.data.decode("utf-8")) json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data # Returned dict is not empty assert json_data # Returned dict is not empty
assert json_data['ARN'] != '' assert json_data['ARN'] != ''
assert json_data['Name'] == 'test-secret' assert json_data['Name'] == DEFAULT_SECRET_NAME
assert json_data['VersionId'] == client_request_token assert json_data['VersionId'] == client_request_token
# @mock_secretsmanager # @mock_secretsmanager
@ -289,7 +292,7 @@ def test_rotate_secret_that_does_not_match():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foosecret"}, "SecretString": "foosecret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret" "X-Amz-Target": "secretsmanager.CreateSecret"
@ -313,7 +316,7 @@ def test_rotate_secret_client_request_token_too_short():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foosecret"}, "SecretString": "foosecret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret" "X-Amz-Target": "secretsmanager.CreateSecret"
@ -322,7 +325,7 @@ def test_rotate_secret_client_request_token_too_short():
client_request_token = "ED9F8B6C-85B7-B7E4-38F2A3BEB13C" client_request_token = "ED9F8B6C-85B7-B7E4-38F2A3BEB13C"
rotate_secret = test_client.post('/', rotate_secret = test_client.post('/',
data={"SecretId": "test-secret", data={"SecretId": DEFAULT_SECRET_NAME,
"ClientRequestToken": client_request_token}, "ClientRequestToken": client_request_token},
headers={ headers={
"X-Amz-Target": "secretsmanager.RotateSecret" "X-Amz-Target": "secretsmanager.RotateSecret"
@ -339,7 +342,7 @@ def test_rotate_secret_client_request_token_too_long():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foosecret"}, "SecretString": "foosecret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret" "X-Amz-Target": "secretsmanager.CreateSecret"
@ -351,7 +354,7 @@ def test_rotate_secret_client_request_token_too_long():
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C' 'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C'
) )
rotate_secret = test_client.post('/', rotate_secret = test_client.post('/',
data={"SecretId": "test-secret", data={"SecretId": DEFAULT_SECRET_NAME,
"ClientRequestToken": client_request_token}, "ClientRequestToken": client_request_token},
headers={ headers={
"X-Amz-Target": "secretsmanager.RotateSecret" "X-Amz-Target": "secretsmanager.RotateSecret"
@ -368,7 +371,7 @@ def test_rotate_secret_rotation_lambda_arn_too_long():
test_client = backend.test_client() test_client = backend.test_client()
create_secret = test_client.post('/', create_secret = test_client.post('/',
data={"Name": "test-secret", data={"Name": DEFAULT_SECRET_NAME,
"SecretString": "foosecret"}, "SecretString": "foosecret"},
headers={ headers={
"X-Amz-Target": "secretsmanager.CreateSecret" "X-Amz-Target": "secretsmanager.CreateSecret"
@ -377,7 +380,7 @@ def test_rotate_secret_rotation_lambda_arn_too_long():
rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters
rotate_secret = test_client.post('/', rotate_secret = test_client.post('/',
data={"SecretId": "test-secret", data={"SecretId": DEFAULT_SECRET_NAME,
"RotationLambdaARN": rotation_lambda_arn}, "RotationLambdaARN": rotation_lambda_arn},
headers={ headers={
"X-Amz-Target": "secretsmanager.RotateSecret" "X-Amz-Target": "secretsmanager.RotateSecret"
@ -389,6 +392,164 @@ def test_rotate_secret_rotation_lambda_arn_too_long():
assert json_data['__type'] == 'InvalidParameterException' assert json_data['__type'] == 'InvalidParameterException'
@mock_secretsmanager
def test_put_secret_value_puts_new_secret():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": "foosecret",
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
put_second_secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": "foosecret",
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
second_secret_json_data = json.loads(put_second_secret_value_json.data.decode("utf-8"))
version_id = second_secret_json_data['VersionId']
secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"VersionId": version_id,
"VersionStage": 'AWSCURRENT'},
headers={
"X-Amz-Target": "secretsmanager.GetSecretValue"},
)
second_secret_json_data = json.loads(secret_value_json.data.decode("utf-8"))
assert second_secret_json_data
assert second_secret_json_data['SecretString'] == 'foosecret'
@mock_secretsmanager
def test_put_secret_value_can_get_first_version_if_put_twice():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
first_secret_string = 'first_secret'
second_secret_string = 'second_secret'
put_first_secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": first_secret_string,
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
first_secret_json_data = json.loads(put_first_secret_value_json.data.decode("utf-8"))
first_secret_version_id = first_secret_json_data['VersionId']
test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": second_secret_string,
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
get_first_secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"VersionId": first_secret_version_id,
"VersionStage": 'AWSCURRENT'},
headers={
"X-Amz-Target": "secretsmanager.GetSecretValue"},
)
get_first_secret_json_data = json.loads(get_first_secret_value_json.data.decode("utf-8"))
assert get_first_secret_json_data
assert get_first_secret_json_data['SecretString'] == first_secret_string
@mock_secretsmanager
def test_put_secret_value_versions_differ_if_same_secret_put_twice():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
put_first_secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": "secret",
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
first_secret_json_data = json.loads(put_first_secret_value_json.data.decode("utf-8"))
first_secret_version_id = first_secret_json_data['VersionId']
put_second_secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": "secret",
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
second_secret_json_data = json.loads(put_second_secret_value_json.data.decode("utf-8"))
second_secret_version_id = second_secret_json_data['VersionId']
assert first_secret_version_id != second_secret_version_id
@mock_secretsmanager
def test_can_list_secret_version_ids():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
put_first_secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": "secret",
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
first_secret_json_data = json.loads(put_first_secret_value_json.data.decode("utf-8"))
first_secret_version_id = first_secret_json_data['VersionId']
put_second_secret_value_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME,
"SecretString": "secret",
"VersionStages": ["AWSCURRENT"]},
headers={
"X-Amz-Target": "secretsmanager.PutSecretValue"},
)
second_secret_json_data = json.loads(put_second_secret_value_json.data.decode("utf-8"))
second_secret_version_id = second_secret_json_data['VersionId']
list_secret_versions_json = test_client.post('/',
data={
"SecretId": DEFAULT_SECRET_NAME, },
headers={
"X-Amz-Target": "secretsmanager.ListSecretVersionIds"},
)
versions_list = json.loads(list_secret_versions_json.data.decode("utf-8"))
returned_version_ids = [v['VersionId'] for v in versions_list['Versions']]
assert [first_secret_version_id, second_secret_version_id].sort() == returned_version_ids.sort()
# #
# The following tests should work, but fail on the embedded dict in # The following tests should work, but fail on the embedded dict in
# RotationRules. The error message suggests a problem deeper in the code, which # RotationRules. The error message suggests a problem deeper in the code, which