refactor to store multiple scrects, use uuid
This commit is contained in:
parent
71a054af92
commit
b485122ec6
2
.gitignore
vendored
2
.gitignore
vendored
@ -15,4 +15,4 @@ python_env
|
|||||||
.ropeproject/
|
.ropeproject/
|
||||||
.pytest_cache/
|
.pytest_cache/
|
||||||
venv/
|
venv/
|
||||||
|
.vscode/
|
||||||
|
@ -2,6 +2,7 @@ from __future__ import unicode_literals
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
|
import uuid
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
|
|
||||||
@ -18,10 +19,6 @@ class SecretsManager(BaseModel):
|
|||||||
|
|
||||||
def __init__(self, region_name, **kwargs):
|
def __init__(self, region_name, **kwargs):
|
||||||
self.region = region_name
|
self.region = region_name
|
||||||
self.secret_id = kwargs.get('secret_id', '')
|
|
||||||
self.version_id = kwargs.get('version_id', '')
|
|
||||||
self.version_stage = kwargs.get('version_stage', '')
|
|
||||||
self.secret_string = ''
|
|
||||||
|
|
||||||
|
|
||||||
class SecretsManagerBackend(BaseBackend):
|
class SecretsManagerBackend(BaseBackend):
|
||||||
@ -29,14 +26,7 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
def __init__(self, region_name=None, **kwargs):
|
def __init__(self, region_name=None, **kwargs):
|
||||||
super(SecretsManagerBackend, self).__init__()
|
super(SecretsManagerBackend, self).__init__()
|
||||||
self.region = region_name
|
self.region = region_name
|
||||||
self.secret_id = kwargs.get('secret_id', '')
|
self.secrets = {}
|
||||||
self.name = kwargs.get('name', '')
|
|
||||||
self.createdate = int(time.time())
|
|
||||||
self.secret_string = ''
|
|
||||||
self.rotation_enabled = False
|
|
||||||
self.rotation_lambda_arn = ''
|
|
||||||
self.auto_rotate_after_days = 0
|
|
||||||
self.version_id = ''
|
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
region_name = self.region
|
region_name = self.region
|
||||||
@ -44,36 +34,49 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
self.__init__(region_name)
|
self.__init__(region_name)
|
||||||
|
|
||||||
def _is_valid_identifier(self, identifier):
|
def _is_valid_identifier(self, identifier):
|
||||||
return identifier in (self.name, self.secret_id)
|
return identifier in self.secrets
|
||||||
|
|
||||||
def get_secret_value(self, secret_id, version_id, version_stage):
|
def get_secret_value(self, secret_id, version_id, version_stage):
|
||||||
|
|
||||||
if not self._is_valid_identifier(secret_id):
|
if not self._is_valid_identifier(secret_id):
|
||||||
raise ResourceNotFoundException()
|
raise ResourceNotFoundException()
|
||||||
|
|
||||||
|
secret = self.secrets[secret_id]
|
||||||
|
|
||||||
response = json.dumps({
|
response = json.dumps({
|
||||||
"ARN": secret_arn(self.region, self.secret_id),
|
"ARN": secret_arn(self.region, secret['secret_id']),
|
||||||
"Name": self.name,
|
"Name": secret['name'],
|
||||||
"VersionId": "A435958A-D821-4193-B719-B7769357AER4",
|
"VersionId": secret['version_id'],
|
||||||
"SecretString": self.secret_string,
|
"SecretString": secret['secret_string'],
|
||||||
"VersionStages": [
|
"VersionStages": [
|
||||||
"AWSCURRENT",
|
"AWSCURRENT",
|
||||||
],
|
],
|
||||||
"CreatedDate": "2018-05-23 13:16:57.198000"
|
"CreatedDate": secret['createdate']
|
||||||
})
|
})
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def create_secret(self, name, secret_string, **kwargs):
|
def create_secret(self, name, secret_string, **kwargs):
|
||||||
|
|
||||||
self.secret_string = secret_string
|
generated_version_id = str(uuid.uuid4())
|
||||||
self.secret_id = name
|
|
||||||
self.name = name
|
secret = {
|
||||||
|
'secret_string': secret_string,
|
||||||
|
'secret_id': name,
|
||||||
|
'name': name,
|
||||||
|
'createdate': int(time.time()),
|
||||||
|
'rotation_enabled': False,
|
||||||
|
'rotation_lambda_arn': '',
|
||||||
|
'auto_rotate_after_days': 0,
|
||||||
|
'version_id': generated_version_id
|
||||||
|
}
|
||||||
|
|
||||||
|
self.secrets[name] = secret
|
||||||
|
|
||||||
response = json.dumps({
|
response = json.dumps({
|
||||||
"ARN": secret_arn(self.region, name),
|
"ARN": secret_arn(self.region, name),
|
||||||
"Name": self.name,
|
"Name": name,
|
||||||
"VersionId": "A435958A-D821-4193-B719-B7769357AER4",
|
"VersionId": generated_version_id,
|
||||||
})
|
})
|
||||||
|
|
||||||
return response
|
return response
|
||||||
@ -82,15 +85,17 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
if not self._is_valid_identifier(secret_id):
|
if not self._is_valid_identifier(secret_id):
|
||||||
raise ResourceNotFoundException
|
raise ResourceNotFoundException
|
||||||
|
|
||||||
|
secret = self.secrets[secret_id]
|
||||||
|
|
||||||
response = json.dumps({
|
response = json.dumps({
|
||||||
"ARN": secret_arn(self.region, self.secret_id),
|
"ARN": secret_arn(self.region, secret['secret_id']),
|
||||||
"Name": self.name,
|
"Name": secret['name'],
|
||||||
"Description": "",
|
"Description": "",
|
||||||
"KmsKeyId": "",
|
"KmsKeyId": "",
|
||||||
"RotationEnabled": self.rotation_enabled,
|
"RotationEnabled": secret['rotation_enabled'],
|
||||||
"RotationLambdaARN": self.rotation_lambda_arn,
|
"RotationLambdaARN": secret['rotation_lambda_arn'],
|
||||||
"RotationRules": {
|
"RotationRules": {
|
||||||
"AutomaticallyAfterDays": self.auto_rotate_after_days
|
"AutomaticallyAfterDays": secret['auto_rotate_after_days']
|
||||||
},
|
},
|
||||||
"LastRotatedDate": None,
|
"LastRotatedDate": None,
|
||||||
"LastChangedDate": None,
|
"LastChangedDate": None,
|
||||||
@ -141,17 +146,19 @@ class SecretsManagerBackend(BaseBackend):
|
|||||||
)
|
)
|
||||||
raise InvalidParameterException(msg)
|
raise InvalidParameterException(msg)
|
||||||
|
|
||||||
self.version_id = client_request_token or ''
|
secret = self.secrets[secret_id]
|
||||||
self.rotation_lambda_arn = rotation_lambda_arn or ''
|
|
||||||
|
secret['version_id'] = client_request_token or ''
|
||||||
|
secret['rotation_lambda_arn'] = rotation_lambda_arn or ''
|
||||||
if rotation_rules:
|
if rotation_rules:
|
||||||
self.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
|
secret['auto_rotate_after_days'] = rotation_rules.get(rotation_days, 0)
|
||||||
if self.auto_rotate_after_days > 0:
|
if secret['auto_rotate_after_days'] > 0:
|
||||||
self.rotation_enabled = True
|
secret['rotation_enabled'] = True
|
||||||
|
|
||||||
response = json.dumps({
|
response = json.dumps({
|
||||||
"ARN": secret_arn(self.region, self.secret_id),
|
"ARN": secret_arn(self.region, secret['secret_id']),
|
||||||
"Name": self.name,
|
"Name": secret['name'],
|
||||||
"VersionId": self.version_id
|
"VersionId": secret['version_id']
|
||||||
})
|
})
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
@ -52,8 +52,9 @@ def random_password(password_length, exclude_characters, exclude_numbers,
|
|||||||
|
|
||||||
|
|
||||||
def secret_arn(region, secret_id):
|
def secret_arn(region, secret_id):
|
||||||
return "arn:aws:secretsmanager:{0}:1234567890:secret:{1}-rIjad".format(
|
id_string = ''.join(random.choice(string.ascii_letters) for _ in range(5))
|
||||||
region, secret_id)
|
return "arn:aws:secretsmanager:{0}:1234567890:secret:{1}-{2}".format(
|
||||||
|
region, secret_id, id_string)
|
||||||
|
|
||||||
|
|
||||||
def _exclude_characters(password, exclude_characters):
|
def _exclude_characters(password, exclude_characters):
|
||||||
|
@ -39,8 +39,7 @@ def test_create_secret():
|
|||||||
conn = boto3.client('secretsmanager', region_name='us-east-1')
|
conn = boto3.client('secretsmanager', region_name='us-east-1')
|
||||||
|
|
||||||
result = conn.create_secret(Name='test-secret', SecretString="foosecret")
|
result = conn.create_secret(Name='test-secret', SecretString="foosecret")
|
||||||
assert result['ARN'] == (
|
assert result['ARN']
|
||||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad')
|
|
||||||
assert result['Name'] == 'test-secret'
|
assert result['Name'] == 'test-secret'
|
||||||
secret = conn.get_secret_value(SecretId='test-secret')
|
secret = conn.get_secret_value(SecretId='test-secret')
|
||||||
assert secret['SecretString'] == 'foosecret'
|
assert secret['SecretString'] == 'foosecret'
|
||||||
@ -159,10 +158,17 @@ def test_describe_secret():
|
|||||||
conn.create_secret(Name='test-secret',
|
conn.create_secret(Name='test-secret',
|
||||||
SecretString='foosecret')
|
SecretString='foosecret')
|
||||||
|
|
||||||
|
conn.create_secret(Name='test-secret-2',
|
||||||
|
SecretString='barsecret')
|
||||||
|
|
||||||
secret_description = conn.describe_secret(SecretId='test-secret')
|
secret_description = conn.describe_secret(SecretId='test-secret')
|
||||||
|
secret_description_2 = conn.describe_secret(SecretId='test-secret-2')
|
||||||
|
|
||||||
assert secret_description # Returned dict is not empty
|
assert secret_description # Returned dict is not empty
|
||||||
assert secret_description['ARN'] == (
|
assert secret_description['Name'] == ('test-secret')
|
||||||
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad')
|
assert secret_description['ARN'] != '' # Test arn not empty
|
||||||
|
assert secret_description_2['Name'] == ('test-secret-2')
|
||||||
|
assert secret_description_2['ARN'] != '' # Test arn not empty
|
||||||
|
|
||||||
@mock_secretsmanager
|
@mock_secretsmanager
|
||||||
def test_describe_secret_that_does_not_exist():
|
def test_describe_secret_that_does_not_exist():
|
||||||
@ -190,9 +196,7 @@ def test_rotate_secret():
|
|||||||
rotated_secret = conn.rotate_secret(SecretId=secret_name)
|
rotated_secret = conn.rotate_secret(SecretId=secret_name)
|
||||||
|
|
||||||
assert rotated_secret
|
assert rotated_secret
|
||||||
assert rotated_secret['ARN'] == (
|
assert rotated_secret['ARN'] != '' # Test arn not empty
|
||||||
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad'
|
|
||||||
)
|
|
||||||
assert rotated_secret['Name'] == secret_name
|
assert rotated_secret['Name'] == secret_name
|
||||||
assert rotated_secret['VersionId'] != ''
|
assert rotated_secret['VersionId'] != ''
|
||||||
|
|
||||||
|
@ -82,12 +82,21 @@ def test_create_secret():
|
|||||||
headers={
|
headers={
|
||||||
"X-Amz-Target": "secretsmanager.CreateSecret"},
|
"X-Amz-Target": "secretsmanager.CreateSecret"},
|
||||||
)
|
)
|
||||||
|
res_2 = test_client.post('/',
|
||||||
|
data={"Name": "test-secret-2",
|
||||||
|
"SecretString": "bar-secret"},
|
||||||
|
headers={
|
||||||
|
"X-Amz-Target": "secretsmanager.CreateSecret"},
|
||||||
|
)
|
||||||
|
|
||||||
json_data = json.loads(res.data.decode("utf-8"))
|
json_data = json.loads(res.data.decode("utf-8"))
|
||||||
assert json_data['ARN'] == (
|
assert json_data['ARN'] != ''
|
||||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad')
|
|
||||||
assert json_data['Name'] == 'test-secret'
|
assert json_data['Name'] == 'test-secret'
|
||||||
|
|
||||||
|
json_data_2 = json.loads(res_2.data.decode("utf-8"))
|
||||||
|
assert json_data_2['ARN'] != ''
|
||||||
|
assert json_data_2['Name'] == 'test-secret-2'
|
||||||
|
|
||||||
@mock_secretsmanager
|
@mock_secretsmanager
|
||||||
def test_describe_secret():
|
def test_describe_secret():
|
||||||
|
|
||||||
@ -108,11 +117,29 @@ def test_describe_secret():
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
create_secret_2 = test_client.post('/',
|
||||||
|
data={"Name": "test-secret-2",
|
||||||
|
"SecretString": "barsecret"},
|
||||||
|
headers={
|
||||||
|
"X-Amz-Target": "secretsmanager.CreateSecret"
|
||||||
|
},
|
||||||
|
)
|
||||||
|
describe_secret_2 = test_client.post('/',
|
||||||
|
data={"SecretId": "test-secret-2"},
|
||||||
|
headers={
|
||||||
|
"X-Amz-Target": "secretsmanager.DescribeSecret"
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
json_data = json.loads(describe_secret.data.decode("utf-8"))
|
json_data = json.loads(describe_secret.data.decode("utf-8"))
|
||||||
assert json_data # Returned dict is not empty
|
assert json_data # Returned dict is not empty
|
||||||
assert json_data['ARN'] == (
|
assert json_data['ARN'] != ''
|
||||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
|
assert json_data['Name'] == 'test-secret'
|
||||||
)
|
|
||||||
|
json_data_2 = json.loads(describe_secret_2.data.decode("utf-8"))
|
||||||
|
assert json_data_2 # Returned dict is not empty
|
||||||
|
assert json_data_2['ARN'] != ''
|
||||||
|
assert json_data_2['Name'] == 'test-secret-2'
|
||||||
|
|
||||||
@mock_secretsmanager
|
@mock_secretsmanager
|
||||||
def test_describe_secret_that_does_not_exist():
|
def test_describe_secret_that_does_not_exist():
|
||||||
@ -179,9 +206,7 @@ def test_rotate_secret():
|
|||||||
|
|
||||||
json_data = json.loads(rotate_secret.data.decode("utf-8"))
|
json_data = json.loads(rotate_secret.data.decode("utf-8"))
|
||||||
assert json_data # Returned dict is not empty
|
assert json_data # Returned dict is not empty
|
||||||
assert json_data['ARN'] == (
|
assert json_data['ARN'] != ''
|
||||||
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
|
|
||||||
)
|
|
||||||
assert json_data['Name'] == 'test-secret'
|
assert json_data['Name'] == 'test-secret'
|
||||||
assert json_data['VersionId'] == client_request_token
|
assert json_data['VersionId'] == client_request_token
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user