SecretsManager secret value binary support (#2222)

This commit is contained in:
A 2019-05-28 16:32:43 +01:00 committed by Terry Cain
parent 21917c4b93
commit 2386d47fe3
3 changed files with 31 additions and 7 deletions

View File

@ -70,24 +70,31 @@ class SecretsManagerBackend(BaseBackend):
secret_version = secret['versions'][version_id] secret_version = secret['versions'][version_id]
response = json.dumps({ response_data = {
"ARN": secret_arn(self.region, secret['secret_id']), "ARN": secret_arn(self.region, secret['secret_id']),
"Name": secret['name'], "Name": secret['name'],
"VersionId": secret_version['version_id'], "VersionId": secret_version['version_id'],
"SecretString": secret_version['secret_string'],
"VersionStages": secret_version['version_stages'], "VersionStages": secret_version['version_stages'],
"CreatedDate": secret_version['createdate'], "CreatedDate": secret_version['createdate'],
}) }
if 'secret_string' in secret_version:
response_data["SecretString"] = secret_version['secret_string']
if 'secret_binary' in secret_version:
response_data["SecretBinary"] = secret_version['secret_binary']
response = json.dumps(response_data)
return response return response
def create_secret(self, name, secret_string, tags, **kwargs): def create_secret(self, name, secret_string=None, secret_binary=None, tags=[], **kwargs):
# error if secret exists # error if secret exists
if name in self.secrets.keys(): if name in self.secrets.keys():
raise ResourceExistsException('A resource with the ID you requested already exists.') raise ResourceExistsException('A resource with the ID you requested already exists.')
version_id = self._add_secret(name, secret_string, tags=tags) version_id = self._add_secret(name, secret_string=secret_string, secret_binary=secret_binary, tags=tags)
response = json.dumps({ response = json.dumps({
"ARN": secret_arn(self.region, name), "ARN": secret_arn(self.region, name),
@ -97,7 +104,7 @@ class SecretsManagerBackend(BaseBackend):
return response return response
def _add_secret(self, secret_id, secret_string, tags=[], version_id=None, version_stages=None): def _add_secret(self, secret_id, secret_string=None, secret_binary=None, tags=[], version_id=None, version_stages=None):
if version_stages is None: if version_stages is None:
version_stages = ['AWSCURRENT'] version_stages = ['AWSCURRENT']
@ -106,12 +113,17 @@ class SecretsManagerBackend(BaseBackend):
version_id = str(uuid.uuid4()) version_id = str(uuid.uuid4())
secret_version = { secret_version = {
'secret_string': secret_string,
'createdate': int(time.time()), 'createdate': int(time.time()),
'version_id': version_id, 'version_id': version_id,
'version_stages': version_stages, 'version_stages': version_stages,
} }
if secret_string is not None:
secret_version['secret_string'] = secret_string
if secret_binary is not None:
secret_version['secret_binary'] = secret_binary
if secret_id in self.secrets: if secret_id in self.secrets:
# remove all old AWSPREVIOUS stages # remove all old AWSPREVIOUS stages
for secret_verion_to_look_at in self.secrets[secret_id]['versions'].values(): for secret_verion_to_look_at in self.secrets[secret_id]['versions'].values():

View File

@ -21,10 +21,12 @@ class SecretsManagerResponse(BaseResponse):
def create_secret(self): def create_secret(self):
name = self._get_param('Name') name = self._get_param('Name')
secret_string = self._get_param('SecretString') secret_string = self._get_param('SecretString')
secret_binary = self._get_param('SecretBinary')
tags = self._get_param('Tags', if_none=[]) tags = self._get_param('Tags', if_none=[])
return secretsmanager_backends[self.region].create_secret( return secretsmanager_backends[self.region].create_secret(
name=name, name=name,
secret_string=secret_string, secret_string=secret_string,
secret_binary=secret_binary,
tags=tags tags=tags
) )

View File

@ -9,6 +9,7 @@ import unittest
import pytz import pytz
from datetime import datetime from datetime import datetime
from nose.tools import assert_raises from nose.tools import assert_raises
from six import b
DEFAULT_SECRET_NAME = 'test-secret' DEFAULT_SECRET_NAME = 'test-secret'
@ -22,6 +23,15 @@ def test_get_secret_value():
result = conn.get_secret_value(SecretId='java-util-test-password') result = conn.get_secret_value(SecretId='java-util-test-password')
assert result['SecretString'] == 'foosecret' assert result['SecretString'] == 'foosecret'
@mock_secretsmanager
def test_get_secret_value_binary():
conn = boto3.client('secretsmanager', region_name='us-west-2')
create_secret = conn.create_secret(Name='java-util-test-password',
SecretBinary=b("foosecret"))
result = conn.get_secret_value(SecretId='java-util-test-password')
assert result['SecretBinary'] == b('foosecret')
@mock_secretsmanager @mock_secretsmanager
def test_get_secret_that_does_not_exist(): def test_get_secret_that_does_not_exist():
conn = boto3.client('secretsmanager', region_name='us-west-2') conn = boto3.client('secretsmanager', region_name='us-west-2')