Merge pull request #1782 from NeilRoberts/Moto-1781_add_rotate_secret_to_secretsmanager

Issue# 1781 implement secretsmanager.RotateSecret
This commit is contained in:
Steve Pulec 2018-09-22 16:21:31 -04:00 committed by GitHub
commit 2d978aa1c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 458 additions and 4 deletions

View File

@ -3645,7 +3645,7 @@
- [ ] put_attributes
- [ ] select
## secretsmanager - 27% implemented
## secretsmanager - 33% implemented
- [ ] cancel_rotate_secret
- [X] create_secret
- [ ] delete_secret
@ -3656,7 +3656,7 @@
- [ ] list_secrets
- [ ] put_secret_value
- [ ] restore_secret
- [ ] rotate_secret
- [X] rotate_secret
- [ ] tag_resource
- [ ] untag_resource
- [ ] update_secret

View File

@ -36,6 +36,7 @@ class SecretsManagerBackend(BaseBackend):
self.rotation_enabled = False
self.rotation_lambda_arn = ''
self.auto_rotate_after_days = 0
self.version_id = ''
def reset(self):
region_name = self.region
@ -105,6 +106,56 @@ class SecretsManagerBackend(BaseBackend):
return response
def rotate_secret(self, secret_id, client_request_token=None,
rotation_lambda_arn=None, rotation_rules=None):
rotation_days = 'AutomaticallyAfterDays'
if not self._is_valid_identifier(secret_id):
raise ResourceNotFoundException
if client_request_token:
token_length = len(client_request_token)
if token_length < 32 or token_length > 64:
msg = (
'ClientRequestToken '
'must be 32-64 characters long.'
)
raise InvalidParameterException(msg)
if rotation_lambda_arn:
if len(rotation_lambda_arn) > 2048:
msg = (
'RotationLambdaARN '
'must <= 2048 characters long.'
)
raise InvalidParameterException(msg)
if rotation_rules:
if rotation_days in rotation_rules:
rotation_period = rotation_rules[rotation_days]
if rotation_period < 1 or rotation_period > 1000:
msg = (
'RotationRules.AutomaticallyAfterDays '
'must be within 1-1000.'
)
raise InvalidParameterException(msg)
self.version_id = client_request_token or ''
self.rotation_lambda_arn = rotation_lambda_arn or ''
if rotation_rules:
self.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
if self.auto_rotate_after_days > 0:
self.rotation_enabled = True
response = json.dumps({
"ARN": secret_arn(self.region, self.secret_id),
"Name": self.name,
"VersionId": self.version_id
})
return response
def get_random_password(self, password_length,
exclude_characters, exclude_numbers,
exclude_punctuation, exclude_uppercase,

View File

@ -50,3 +50,15 @@ class SecretsManagerResponse(BaseResponse):
return secretsmanager_backends[self.region].describe_secret(
secret_id=secret_id
)
def rotate_secret(self):
client_request_token = self._get_param('ClientRequestToken')
rotation_lambda_arn = self._get_param('RotationLambdaARN')
rotation_rules = self._get_param('RotationRules')
secret_id = self._get_param('SecretId')
return secretsmanager_backends[self.region].rotate_secret(
secret_id=secret_id,
client_request_token=client_request_token,
rotation_lambda_arn=rotation_lambda_arn,
rotation_rules=rotation_rules
)

View File

@ -26,13 +26,13 @@ def test_get_secret_that_does_not_exist():
result = conn.get_secret_value(SecretId='i-dont-exist')
@mock_secretsmanager
def test_get_secret_with_mismatched_id():
def test_get_secret_that_does_not_match():
conn = boto3.client('secretsmanager', region_name='us-west-2')
create_secret = conn.create_secret(Name='java-util-test-password',
SecretString="foosecret")
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-exist')
result = conn.get_secret_value(SecretId='i-dont-match')
@mock_secretsmanager
def test_create_secret():
@ -179,3 +179,108 @@ def test_describe_secret_that_does_not_match():
with assert_raises(ClientError):
result = conn.get_secret_value(SecretId='i-dont-match')
@mock_secretsmanager
def test_rotate_secret():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
rotated_secret = conn.rotate_secret(SecretId=secret_name)
assert rotated_secret
assert rotated_secret['ARN'] == (
'arn:aws:secretsmanager:us-west-2:1234567890:secret:test-secret-rIjad'
)
assert rotated_secret['Name'] == secret_name
assert rotated_secret['VersionId'] != ''
@mock_secretsmanager
def test_rotate_secret_enable_rotation():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
initial_description = conn.describe_secret(SecretId=secret_name)
assert initial_description
assert initial_description['RotationEnabled'] is False
assert initial_description['RotationRules']['AutomaticallyAfterDays'] == 0
conn.rotate_secret(SecretId=secret_name,
RotationRules={'AutomaticallyAfterDays': 42})
rotated_description = conn.describe_secret(SecretId=secret_name)
assert rotated_description
assert rotated_description['RotationEnabled'] is True
assert rotated_description['RotationRules']['AutomaticallyAfterDays'] == 42
@mock_secretsmanager
def test_rotate_secret_that_does_not_exist():
conn = boto3.client('secretsmanager', 'us-west-2')
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId='i-dont-exist')
@mock_secretsmanager
def test_rotate_secret_that_does_not_match():
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name='test-secret',
SecretString='foosecret')
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId='i-dont-match')
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_short():
# Test is intentionally empty. Boto3 catches too short ClientRequestToken
# and raises ParamValidationError before Moto can see it.
# test_server actually handles this error.
assert True
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
client_request_token = (
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C-'
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C'
)
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name,
ClientRequestToken=client_request_token)
@mock_secretsmanager
def test_rotate_secret_rotation_lambda_arn_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name,
RotationLambdaARN=rotation_lambda_arn)
@mock_secretsmanager
def test_rotate_secret_rotation_period_zero():
# Test is intentionally empty. Boto3 catches zero day rotation period
# and raises ParamValidationError before Moto can see it.
# test_server actually handles this error.
assert True
@mock_secretsmanager
def test_rotate_secret_rotation_period_too_long():
secret_name = 'test-secret'
conn = boto3.client('secretsmanager', region_name='us-west-2')
conn.create_secret(Name=secret_name,
SecretString='foosecret')
rotation_rules = {'AutomaticallyAfterDays': 1001}
with assert_raises(ClientError):
result = conn.rotate_secret(SecretId=secret_name,
RotationRules=rotation_rules)

View File

@ -49,6 +49,27 @@ def test_get_secret_that_does_not_exist():
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_get_secret_that_does_not_match():
backend = server.create_backend_app("secretsmanager")
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foo-secret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"},
)
get_secret = test_client.post('/',
data={"SecretId": "i-dont-match",
"VersionStage": "AWSCURRENT"},
headers={
"X-Amz-Target": "secretsmanager.GetSecretValue"},
)
json_data = json.loads(get_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_create_secret():
@ -133,3 +154,268 @@ def test_describe_secret_that_does_not_match():
json_data = json.loads(describe_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_rotate_secret():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
client_request_token = "EXAMPLE2-90ab-cdef-fedc-ba987SECRET2"
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"ClientRequestToken": client_request_token},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data # Returned dict is not empty
assert json_data['ARN'] == (
'arn:aws:secretsmanager:us-east-1:1234567890:secret:test-secret-rIjad'
)
assert json_data['Name'] == 'test-secret'
assert json_data['VersionId'] == client_request_token
# @mock_secretsmanager
# def test_rotate_secret_enable_rotation():
# backend = server.create_backend_app('secretsmanager')
# test_client = backend.test_client()
# create_secret = test_client.post(
# '/',
# data={
# "Name": "test-secret",
# "SecretString": "foosecret"
# },
# headers={
# "X-Amz-Target": "secretsmanager.CreateSecret"
# },
# )
# initial_description = test_client.post(
# '/',
# data={
# "SecretId": "test-secret"
# },
# headers={
# "X-Amz-Target": "secretsmanager.DescribeSecret"
# },
# )
# json_data = json.loads(initial_description.data.decode("utf-8"))
# assert json_data # Returned dict is not empty
# assert json_data['RotationEnabled'] is False
# assert json_data['RotationRules']['AutomaticallyAfterDays'] == 0
# rotate_secret = test_client.post(
# '/',
# data={
# "SecretId": "test-secret",
# "RotationRules": {"AutomaticallyAfterDays": 42}
# },
# headers={
# "X-Amz-Target": "secretsmanager.RotateSecret"
# },
# )
# rotated_description = test_client.post(
# '/',
# data={
# "SecretId": "test-secret"
# },
# headers={
# "X-Amz-Target": "secretsmanager.DescribeSecret"
# },
# )
# json_data = json.loads(rotated_description.data.decode("utf-8"))
# assert json_data # Returned dict is not empty
# assert json_data['RotationEnabled'] is True
# assert json_data['RotationRules']['AutomaticallyAfterDays'] == 42
@mock_secretsmanager
def test_rotate_secret_that_does_not_exist():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
rotate_secret = test_client.post('/',
data={"SecretId": "i-dont-exist"},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_rotate_secret_that_does_not_match():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
rotate_secret = test_client.post('/',
data={"SecretId": "i-dont-match"},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "Secrets Manager can't find the specified secret"
assert json_data['__type'] == 'ResourceNotFoundException'
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_short():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
client_request_token = "ED9F8B6C-85B7-B7E4-38F2A3BEB13C"
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"ClientRequestToken": client_request_token},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "ClientRequestToken must be 32-64 characters long."
assert json_data['__type'] == 'InvalidParameterException'
@mock_secretsmanager
def test_rotate_secret_client_request_token_too_long():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
client_request_token = (
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C-'
'ED9F8B6C-85B7-446A-B7E4-38F2A3BEB13C'
)
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"ClientRequestToken": client_request_token},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "ClientRequestToken must be 32-64 characters long."
assert json_data['__type'] == 'InvalidParameterException'
@mock_secretsmanager
def test_rotate_secret_rotation_lambda_arn_too_long():
backend = server.create_backend_app('secretsmanager')
test_client = backend.test_client()
create_secret = test_client.post('/',
data={"Name": "test-secret",
"SecretString": "foosecret"},
headers={
"X-Amz-Target": "secretsmanager.CreateSecret"
},
)
rotation_lambda_arn = '85B7-446A-B7E4' * 147 # == 2058 characters
rotate_secret = test_client.post('/',
data={"SecretId": "test-secret",
"RotationLambdaARN": rotation_lambda_arn},
headers={
"X-Amz-Target": "secretsmanager.RotateSecret"
},
)
json_data = json.loads(rotate_secret.data.decode("utf-8"))
assert json_data['message'] == "RotationLambdaARN must <= 2048 characters long."
assert json_data['__type'] == 'InvalidParameterException'
#
# The following tests should work, but fail on the embedded dict in
# RotationRules. The error message suggests a problem deeper in the code, which
# needs further investigation.
#
# @mock_secretsmanager
# def test_rotate_secret_rotation_period_zero():
# backend = server.create_backend_app('secretsmanager')
# test_client = backend.test_client()
# create_secret = test_client.post('/',
# data={"Name": "test-secret",
# "SecretString": "foosecret"},
# headers={
# "X-Amz-Target": "secretsmanager.CreateSecret"
# },
# )
# rotate_secret = test_client.post('/',
# data={"SecretId": "test-secret",
# "RotationRules": {"AutomaticallyAfterDays": 0}},
# headers={
# "X-Amz-Target": "secretsmanager.RotateSecret"
# },
# )
# json_data = json.loads(rotate_secret.data.decode("utf-8"))
# assert json_data['message'] == "RotationRules.AutomaticallyAfterDays must be within 1-1000."
# assert json_data['__type'] == 'InvalidParameterException'
# @mock_secretsmanager
# def test_rotate_secret_rotation_period_too_long():
# backend = server.create_backend_app('secretsmanager')
# test_client = backend.test_client()
# create_secret = test_client.post('/',
# data={"Name": "test-secret",
# "SecretString": "foosecret"},
# headers={
# "X-Amz-Target": "secretsmanager.CreateSecret"
# },
# )
# rotate_secret = test_client.post('/',
# data={"SecretId": "test-secret",
# "RotationRules": {"AutomaticallyAfterDays": 1001}},
# headers={
# "X-Amz-Target": "secretsmanager.RotateSecret"
# },
# )
# json_data = json.loads(rotate_secret.data.decode("utf-8"))
# assert json_data['message'] == "RotationRules.AutomaticallyAfterDays must be within 1-1000."
# assert json_data['__type'] == 'InvalidParameterException'