SSM - Integrate with SecretsManager (#5117)

This commit is contained in:
Bert Blommers 2022-05-10 22:32:49 +00:00 committed by GitHub
parent ae6b28b5b9
commit 749b543b7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 218 additions and 33 deletions

View File

@ -281,9 +281,7 @@ class SecretsManagerBackend(BaseBackend):
):
raise SecretHasNoValueException(version_stage or "AWSCURRENT")
response = json.dumps(response_data)
return response
return response_data
def update_secret(
self,
@ -442,7 +440,7 @@ class SecretsManagerBackend(BaseBackend):
secret = self.secrets[secret_id]
return json.dumps(secret.to_dict())
return secret.to_dict()
def rotate_secret(
self,

View File

@ -30,13 +30,18 @@ def _validate_filters(filters):
class SecretsManagerResponse(BaseResponse):
@property
def backend(self):
return secretsmanager_backends[self.region]
def get_secret_value(self):
secret_id = self._get_param("SecretId")
version_id = self._get_param("VersionId")
version_stage = self._get_param("VersionStage")
return secretsmanager_backends[self.region].get_secret_value(
value = self.backend.get_secret_value(
secret_id=secret_id, version_id=version_id, version_stage=version_stage
)
return json.dumps(value)
def create_secret(self):
name = self._get_param("Name")
@ -46,7 +51,7 @@ class SecretsManagerResponse(BaseResponse):
tags = self._get_param("Tags", if_none=[])
kms_key_id = self._get_param("KmsKeyId", if_none=None)
client_request_token = self._get_param("ClientRequestToken", if_none=None)
return secretsmanager_backends[self.region].create_secret(
return self.backend.create_secret(
name=name,
secret_string=secret_string,
secret_binary=secret_binary,
@ -62,7 +67,7 @@ class SecretsManagerResponse(BaseResponse):
secret_binary = self._get_param("SecretBinary")
client_request_token = self._get_param("ClientRequestToken")
kms_key_id = self._get_param("KmsKeyId", if_none=None)
return secretsmanager_backends[self.region].update_secret(
return self.backend.update_secret(
secret_id=secret_id,
secret_string=secret_string,
secret_binary=secret_binary,
@ -81,7 +86,7 @@ class SecretsManagerResponse(BaseResponse):
require_each_included_type = self._get_param(
"RequireEachIncludedType", if_none=True
)
return secretsmanager_backends[self.region].get_random_password(
return self.backend.get_random_password(
password_length=password_length,
exclude_characters=exclude_characters,
exclude_numbers=exclude_numbers,
@ -94,14 +99,15 @@ class SecretsManagerResponse(BaseResponse):
def describe_secret(self):
secret_id = self._get_param("SecretId")
return secretsmanager_backends[self.region].describe_secret(secret_id=secret_id)
secret = self.backend.describe_secret(secret_id=secret_id)
return json.dumps(secret)
def rotate_secret(self):
client_request_token = self._get_param("ClientRequestToken")
rotation_lambda_arn = self._get_param("RotationLambdaARN")
rotation_rules = self._get_param("RotationRules")
secret_id = self._get_param("SecretId")
return secretsmanager_backends[self.region].rotate_secret(
return self.backend.rotate_secret(
secret_id=secret_id,
client_request_token=client_request_token,
rotation_lambda_arn=rotation_lambda_arn,
@ -121,7 +127,7 @@ class SecretsManagerResponse(BaseResponse):
if not isinstance(version_stages, list):
version_stages = [version_stages]
return secretsmanager_backends[self.region].put_secret_value(
return self.backend.put_secret_value(
secret_id=secret_id,
secret_binary=secret_binary,
secret_string=secret_string,
@ -131,16 +137,14 @@ class SecretsManagerResponse(BaseResponse):
def list_secret_version_ids(self):
secret_id = self._get_param("SecretId", if_none="")
return secretsmanager_backends[self.region].list_secret_version_ids(
secret_id=secret_id
)
return self.backend.list_secret_version_ids(secret_id=secret_id)
def list_secrets(self):
filters = self._get_param("Filters", if_none=[])
_validate_filters(filters)
max_results = self._get_int_param("MaxResults")
next_token = self._get_param("NextToken")
secret_list, next_token = secretsmanager_backends[self.region].list_secrets(
secret_list, next_token = self.backend.list_secrets(
filters=filters, max_results=max_results, next_token=next_token
)
return json.dumps(dict(SecretList=secret_list, NextToken=next_token))
@ -149,7 +153,7 @@ class SecretsManagerResponse(BaseResponse):
secret_id = self._get_param("SecretId")
recovery_window_in_days = self._get_param("RecoveryWindowInDays")
force_delete_without_recovery = self._get_param("ForceDeleteWithoutRecovery")
arn, name, deletion_date = secretsmanager_backends[self.region].delete_secret(
arn, name, deletion_date = self.backend.delete_secret(
secret_id=secret_id,
recovery_window_in_days=recovery_window_in_days,
force_delete_without_recovery=force_delete_without_recovery,
@ -158,35 +162,29 @@ class SecretsManagerResponse(BaseResponse):
def restore_secret(self):
secret_id = self._get_param("SecretId")
arn, name = secretsmanager_backends[self.region].restore_secret(
secret_id=secret_id
)
arn, name = self.backend.restore_secret(secret_id=secret_id)
return json.dumps(dict(ARN=arn, Name=name))
def get_resource_policy(self):
secret_id = self._get_param("SecretId")
return secretsmanager_backends[self.region].get_resource_policy(
secret_id=secret_id
)
return self.backend.get_resource_policy(secret_id=secret_id)
def tag_resource(self):
secret_id = self._get_param("SecretId")
tags = self._get_param("Tags", if_none=[])
return secretsmanager_backends[self.region].tag_resource(secret_id, tags)
return self.backend.tag_resource(secret_id, tags)
def untag_resource(self):
secret_id = self._get_param("SecretId")
tag_keys = self._get_param("TagKeys", if_none=[])
return secretsmanager_backends[self.region].untag_resource(
secret_id=secret_id, tag_keys=tag_keys
)
return self.backend.untag_resource(secret_id=secret_id, tag_keys=tag_keys)
def update_secret_version_stage(self):
secret_id = self._get_param("SecretId")
version_stage = self._get_param("VersionStage")
remove_from_version_id = self._get_param("RemoveFromVersionId")
move_to_version_id = self._get_param("MoveToVersionId")
return secretsmanager_backends[self.region].update_secret_version_stage(
return self.backend.update_secret_version_stage(
secret_id=secret_id,
version_stage=version_stage,
remove_from_version_id=remove_from_version_id,

View File

@ -8,6 +8,8 @@ from moto.core import get_account_id, BaseBackend, BaseModel
from moto.core.exceptions import RESTError
from moto.core.utils import BackendDict
from moto.ec2 import ec2_backends
from moto.secretsmanager import secretsmanager_backends
from moto.secretsmanager.exceptions import SecretsManagerClientError
from moto.utilities.utils import load_resource
import datetime
@ -44,9 +46,12 @@ from .exceptions import (
class ParameterDict(defaultdict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, region_name):
# each value is a list of all of the versions for a parameter
# to get the current value, grab the last item of the list
super().__init__(list)
self.parameters_loaded = False
self.region_name = region_name
def _check_loading_status(self, key):
if not self.parameters_loaded and key and str(key).startswith("/aws"):
@ -81,11 +86,53 @@ class ParameterDict(defaultdict):
)
self.parameters_loaded = True
def _get_secretsmanager_parameter(self, secret_name):
secret = secretsmanager_backends[self.region_name].describe_secret(secret_name)
version_id_to_stage = secret["VersionIdsToStages"]
# Sort version ID's so that AWSCURRENT is last
sorted_version_ids = [
k for k in version_id_to_stage if "AWSCURRENT" not in version_id_to_stage[k]
] + [k for k in version_id_to_stage if "AWSCURRENT" in version_id_to_stage[k]]
values = [
secretsmanager_backends[self.region_name].get_secret_value(
secret_name,
version_id=version_id,
version_stage=None,
)
for version_id in sorted_version_ids
]
return [
Parameter(
name=secret["Name"],
value=val.get("SecretString"),
parameter_type="SecureString",
description=secret.get("Description"),
allowed_pattern=None,
keyid=None,
last_modified_date=secret["LastChangedDate"],
version=0,
data_type="text",
labels=[val.get("VersionId")] + val.get("VersionStages", []),
source_result=json.dumps(secret),
)
for val in values
]
def __getitem__(self, item):
if item.startswith("/aws/reference/secretsmanager/"):
return self._get_secretsmanager_parameter("/".join(item.split("/")[4:]))
self._check_loading_status(item)
return super().__getitem__(item)
def __contains__(self, k):
if k and k.startswith("/aws/reference/secretsmanager/"):
try:
param = self._get_secretsmanager_parameter("/".join(k.split("/")[4:]))
return param is not None
except SecretsManagerClientError:
raise ParameterNotFound(
f"An error occurred (ParameterNotFound) when referencing Secrets Manager: Secret {k} not found."
)
self._check_loading_status(k)
return super().__contains__(k)
@ -116,6 +163,8 @@ class Parameter(BaseModel):
version,
data_type,
tags=None,
labels=None,
source_result=None,
):
self.name = name
self.type = parameter_type
@ -126,7 +175,8 @@ class Parameter(BaseModel):
self.version = version
self.data_type = data_type
self.tags = tags or []
self.labels = []
self.labels = labels or []
self.source_result = source_result
if self.type == "SecureString":
if not self.keyid:
@ -156,6 +206,8 @@ class Parameter(BaseModel):
"LastModifiedDate": round(self.last_modified_date, 3),
"DataType": self.data_type,
}
if self.source_result:
r["SourceResult"] = self.source_result
if region:
r["ARN"] = parameter_arn(region, self.name)
@ -786,9 +838,7 @@ class SimpleSystemManagerBackend(BaseBackend):
def __init__(self, region):
super().__init__()
# each value is a list of all of the versions for a parameter
# to get the current value, grab the last item of the list
self._parameters = ParameterDict(list)
self._parameters = ParameterDict(region)
self._resource_tags = defaultdict(lambda: defaultdict(dict))
self._commands = []

View File

@ -1,6 +1,7 @@
import json
from moto.core.responses import BaseResponse
from .exceptions import ValidationException
from .models import ssm_backends
@ -178,6 +179,14 @@ class SimpleSystemManagerResponse(BaseResponse):
name = self._get_param("Name")
with_decryption = self._get_param("WithDecryption")
if (
name.startswith("/aws/reference/secretsmanager/")
and with_decryption is not True
):
raise ValidationException(
"WithDecryption flag must be True for retrieving a Secret Manager secret."
)
result = self.ssm_backend.get_parameter(name)
if result is None:

View File

@ -0,0 +1,130 @@
import boto3
import json
import pytest
from botocore.exceptions import ClientError
from moto import mock_ssm, mock_secretsmanager
# https://docs.aws.amazon.com/systems-manager/latest/userguide/integration-ps-secretsmanager.html
@mock_secretsmanager
@mock_ssm
def test_get_value_from_secrets_manager__by_name():
# given
ssm = boto3.client("ssm", "eu-north-1")
secrets_manager = boto3.client("secretsmanager", "eu-north-1")
secret_name = "mysecret"
# when
secrets_manager.create_secret(Name=secret_name, SecretString="some secret")
# then
param = ssm.get_parameter(
Name=f"/aws/reference/secretsmanager/{secret_name}", WithDecryption=True
)["Parameter"]
param.should.have.key("Name").equals("mysecret")
param.should.have.key("Type").equals("SecureString")
param.should.have.key("Value").equals("some secret")
param.should.have.key("Version").equals(0)
param.should.have.key("SourceResult")
secret = secrets_manager.describe_secret(SecretId=secret_name)
source_result = json.loads(param["SourceResult"])
source_result["ARN"].should.equal(secret["ARN"])
source_result["Name"].should.equal(secret["Name"])
source_result["VersionIdsToStages"].should.equal(secret["VersionIdsToStages"])
@mock_secretsmanager
@mock_ssm
def test_get_value_from_secrets_manager__without_decryption():
# Note that the parameter does not need to exist
ssm = boto3.client("ssm", "eu-north-1")
with pytest.raises(ClientError) as exc:
ssm.get_parameter(Name="/aws/reference/secretsmanager/sth")
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"WithDecryption flag must be True for retrieving a Secret Manager secret."
)
@mock_secretsmanager
@mock_ssm
def test_get_value_from_secrets_manager__with_decryption_false():
# Note that the parameter does not need to exist
ssm = boto3.client("ssm", "eu-north-1")
with pytest.raises(ClientError) as exc:
ssm.get_parameter(
Name="/aws/reference/secretsmanager/sth", WithDecryption=False
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"WithDecryption flag must be True for retrieving a Secret Manager secret."
)
@mock_secretsmanager
@mock_ssm
def test_get_value_from_secrets_manager__by_id():
# given
ssm = boto3.client("ssm", "eu-north-1")
secrets_manager = boto3.client("secretsmanager", "eu-north-1")
name = "mysecret"
# when
r1 = secrets_manager.create_secret(Name=name, SecretString="1st")
version_id1 = r1["VersionId"]
secrets_manager.put_secret_value(
SecretId=name, SecretString="2nd", VersionStages=["AWSCURRENT"]
)
r3 = secrets_manager.put_secret_value(
SecretId=name, SecretString="3rd", VersionStages=["ST1"]
)
version_id3 = r3["VersionId"]
# then
full_name = f"/aws/reference/secretsmanager/{name}:{version_id1}"
param = ssm.get_parameter(Name=full_name, WithDecryption=True)["Parameter"]
param.should.have.key("Value").equals("1st")
full_name = f"/aws/reference/secretsmanager/{name}"
param = ssm.get_parameter(Name=full_name, WithDecryption=True)["Parameter"]
param.should.have.key("Value").equals("2nd")
full_name = f"/aws/reference/secretsmanager/{name}:{version_id3}"
param = ssm.get_parameter(Name=full_name, WithDecryption=True)["Parameter"]
param.should.have.key("Value").equals("3rd")
@mock_secretsmanager
@mock_ssm
def test_get_value_from_secrets_manager__by_version():
# given
ssm = boto3.client("ssm", "eu-north-1")
secrets_manager = boto3.client("secretsmanager", "eu-north-1")
name = "mysecret"
# when
secrets_manager.create_secret(Name=name, SecretString="1st")
secrets_manager.put_secret_value(
SecretId=name, SecretString="2nd", VersionStages=["AWSCURRENT"]
)
# then
full_name = f"/aws/reference/secretsmanager/{name}:AWSPREVIOUS"
param = ssm.get_parameter(Name=full_name, WithDecryption=True)["Parameter"]
param.should.have.key("Value").equals("1st")
@mock_secretsmanager
@mock_ssm
def test_get_value_from_secrets_manager__param_does_not_exist():
ssm = boto3.client("ssm", "us-east-1")
with pytest.raises(ClientError) as exc:
ssm.get_parameter(
Name="/aws/reference/secretsmanager/test", WithDecryption=True
)
err = exc.value.response["Error"]
err["Code"].should.equal("ParameterNotFound")
err["Message"].should.equal(
"An error occurred (ParameterNotFound) when referencing Secrets Manager: Secret /aws/reference/secretsmanager/test not found."
)