feat: Adding validation to Cloudformation to reject same template_body (#4351)

This commit is contained in:
teddylear 2021-09-28 16:35:46 -04:00 committed by GitHub
parent 31866fb5bd
commit 8c2d0b0557
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 84 additions and 18 deletions

View File

@ -9,7 +9,7 @@ from moto.core.utils import amzn_request_id
from moto.s3 import s3_backend
from moto.core import ACCOUNT_ID
from .models import cloudformation_backends
from .exceptions import ValidationError
from .exceptions import ValidationError, MissingParameterError
from .utils import yaml_tag_constructor
@ -64,6 +64,31 @@ class CloudFormationResponse(BaseResponse):
key = s3_backend.get_object(bucket_name, key_name)
return key.value.decode("utf-8")
def _get_params_from_list(self, parameters_list):
# Hack dict-comprehension
return dict(
[
(parameter["parameter_key"], parameter["parameter_value"])
for parameter in parameters_list
]
)
def _get_param_values(self, parameters_list, existing_params):
result = {}
for parameter in parameters_list:
if parameter.keys() >= {"parameter_key", "parameter_value"}:
result[parameter["parameter_key"]] = parameter["parameter_value"]
elif (
parameter.keys() >= {"parameter_key", "use_previous_value"}
and parameter["parameter_key"] in existing_params
):
result[parameter["parameter_key"]] = existing_params[
parameter["parameter_key"]
]
else:
raise MissingParameterError(parameter["parameter_key"])
return result
def create_stack(self):
stack_name = self._get_param("StackName")
stack_body = self._get_param("TemplateBody")
@ -81,13 +106,8 @@ class CloudFormationResponse(BaseResponse):
)
return 400, {"status": 400}, template.render(name=stack_name)
# Hack dict-comprehension
parameters = dict(
[
(parameter["parameter_key"], parameter["parameter_value"])
for parameter in parameters_list
]
)
parameters = self._get_params_from_list(parameters_list)
if template_url:
stack_body = self._get_stack_from_s3_url(template_url)
stack_notification_arns = self._get_multi_param("NotificationARNs.member")
@ -312,6 +332,22 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(GET_TEMPLATE_SUMMARY_TEMPLATE)
return template.render(template_summary=template_summary)
def _validate_different_update(self, incoming_params, stack_body, old_stack):
if incoming_params and stack_body:
new_params = self._get_param_values(incoming_params, old_stack.parameters)
if old_stack.template == stack_body and old_stack.parameters == new_params:
raise ValidationError(
old_stack.name, message=f"Stack [{old_stack.name}] already exists",
)
def _validate_status(self, stack):
if stack.status == "ROLLBACK_COMPLETE":
raise ValidationError(
stack.stack_id,
message="Stack:{0} is in ROLLBACK_COMPLETE state and can not "
"be updated.".format(stack.stack_id),
)
def update_stack(self):
stack_name = self._get_param("StackName")
role_arn = self._get_param("RoleARN")
@ -336,13 +372,8 @@ class CloudFormationResponse(BaseResponse):
tags = None
stack = self.cloudformation_backend.get_stack(stack_name)
if stack.status == "ROLLBACK_COMPLETE":
raise ValidationError(
stack.stack_id,
message="Stack:{0} is in ROLLBACK_COMPLETE state and can not be updated.".format(
stack.stack_id
),
)
self._validate_different_update(incoming_params, stack_body, stack)
self._validate_status(stack)
stack = self.cloudformation_backend.update_stack(
name=stack_name,

View File

@ -3,12 +3,10 @@ from __future__ import unicode_literals
import json
from collections import OrderedDict
from datetime import datetime, timedelta
from freezegun import freeze_time
import pytz
import boto3
from botocore.exceptions import ClientError, ValidationError
import sure # noqa
from botocore.exceptions import ClientError
import pytest
@ -1023,6 +1021,43 @@ def test_boto3_update_stack_fail_missing_new_parameter():
)
@mock_cloudformation
def test_boto3_update_stack_fail_update_same_template_body():
name = "update_stack_with_previous_value"
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
params = [
{"ParameterKey": "TagName", "ParameterValue": "foo"},
{"ParameterKey": "TagDescription", "ParameterValue": "bar"},
]
cf_conn.create_stack(
StackName=name, TemplateBody=dummy_template_yaml_with_ref, Parameters=params,
)
with pytest.raises(ClientError) as exp:
cf_conn.update_stack(
StackName=name,
TemplateBody=dummy_template_yaml_with_ref,
Parameters=params,
)
exp_err = exp.value.response.get("Error")
exp_metadata = exp.value.response.get("ResponseMetadata")
exp_err.get("Code").should.equal("ValidationError")
exp_err.get("Message").should.equal(f"Stack [{name}] already exists")
exp_metadata.get("HTTPStatusCode").should.equal(400)
cf_conn.update_stack(
StackName=name,
TemplateBody=dummy_template_yaml_with_ref,
Parameters=[
{"ParameterKey": "TagName", "ParameterValue": "new_foo"},
{"ParameterKey": "TagDescription", "ParameterValue": "new_bar"},
],
)
@mock_cloudformation
def test_boto3_update_stack_deleted_resources_can_reference_deleted_parameters():