CloudFormation - Get/Set stack policy (no parsing) (#4628)

This commit is contained in:
Bert Blommers 2021-11-24 08:54:38 -01:00 committed by GitHub
parent d64fd52b57
commit 5fa932c3cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 168 additions and 0 deletions

View File

@ -234,6 +234,7 @@ class FakeStack(BaseModel):
self.role_arn = role_arn
self.tags = tags if tags else {}
self.events = []
self.policy = ""
self.cross_stack_resources = cross_stack_resources or {}
self.resource_map = self._create_resource_map()
@ -828,6 +829,23 @@ class CloudFormationBackend(BaseBackend):
stack.update(template, role_arn, parameters=resolved_parameters, tags=tags)
return stack
def get_stack_policy(self, stack_name):
try:
stack = self.get_stack(stack_name)
except ValidationError:
raise ValidationError(message=f"Stack: {stack_name} does not exist")
return stack.policy
def set_stack_policy(self, stack_name, policy_body):
"""
Note that Moto does no validation/parsing/enforcement of this policy - we simply persist it.
"""
try:
stack = self.get_stack(stack_name)
except ValidationError:
raise ValidationError(message=f"Stack: {stack_name} does not exist")
stack.policy = policy_body
def list_stack_resources(self, stack_name_or_id):
stack = self.get_stack(stack_name_or_id)
return stack.stack_resources

View File

@ -5,6 +5,7 @@ from urllib.parse import urlparse
from moto.core.responses import BaseResponse
from moto.core.utils import amzn_request_id
from moto.s3 import s3_backend
from moto.s3.exceptions import S3ClientError
from moto.core import ACCOUNT_ID
from .models import cloudformation_backends
from .exceptions import ValidationError, MissingParameterError
@ -632,6 +633,32 @@ class CloudFormationResponse(BaseResponse):
template = self.response_template(UPDATE_STACK_INSTANCES_RESPONSE_TEMPLATE)
return template.render(operation=operation)
def get_stack_policy(self):
stack_name = self._get_param("StackName")
policy = self.cloudformation_backend.get_stack_policy(stack_name)
template = self.response_template(GET_STACK_POLICY_RESPONSE)
return template.render(policy=policy)
def set_stack_policy(self):
stack_name = self._get_param("StackName")
policy_url = self._get_param("StackPolicyURL")
policy_body = self._get_param("StackPolicyBody")
if policy_body and policy_url:
raise ValidationError(
message="You cannot specify both StackPolicyURL and StackPolicyBody"
)
if policy_url:
try:
policy_body = self._get_stack_from_s3_url(policy_url)
except S3ClientError as s3_e:
raise ValidationError(
message=f"S3 error: Access Denied: {s3_e.error_type}"
)
self.cloudformation_backend.set_stack_policy(
stack_name, policy_body=policy_body
)
return SET_STACK_POLICY_RESPONSE
VALIDATE_STACK_RESPONSE_TEMPLATE = """<ValidateTemplateResponse>
<ValidateTemplateResult>
@ -1244,3 +1271,21 @@ GET_TEMPLATE_SUMMARY_TEMPLATE = """<GetTemplateSummaryResponse xmlns="http://clo
</ResponseMetadata>
</GetTemplateSummaryResponse>
"""
SET_STACK_POLICY_RESPONSE = """<SetStackPolicyResponse xmlns="http://cloudformation.amazonaws.com/doc/2010-05-15/">
<ResponseMetadata>
<RequestId>abe48993-e23f-4167-b703-5b0f1b6aa84f</RequestId>
</ResponseMetadata>
</SetStackPolicyResponse>"""
GET_STACK_POLICY_RESPONSE = """<GetStackPolicyResponse xmlns="http://cloudformation.amazonaws.com/doc/2010-05-15/">
<GetStackPolicyResult>
{% if policy %}
<StackPolicyBody>{{ policy }}</StackPolicyBody>
{% endif %}
</GetStackPolicyResult>
<ResponseMetadata>
<RequestId>e9e39eb6-1c05-4f0e-958a-b63f420e0a07</RequestId>
</ResponseMetadata>
</GetStackPolicyResponse>"""

View File

@ -0,0 +1,105 @@
import boto3
import json
import pytest
from botocore.exceptions import ClientError
from moto import mock_cloudformation, mock_s3
from .test_cloudformation_stack_crud_boto3 import dummy_template_json
@mock_cloudformation
def test_set_stack_policy_on_nonexisting_stack():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
cf_conn.set_stack_policy(StackName="unknown", StackPolicyBody="{}")
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationError")
err["Message"].should.equal("Stack: unknown does not exist")
err["Type"].should.equal("Sender")
@mock_cloudformation
def test_get_stack_policy_on_nonexisting_stack():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
cf_conn.get_stack_policy(StackName="unknown")
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationError")
err["Message"].should.equal("Stack: unknown does not exist")
err["Type"].should.equal("Sender")
@mock_cloudformation
def test_get_stack_policy_on_stack_without_policy():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json)
resp = cf_conn.get_stack_policy(StackName="test_stack")
resp.shouldnt.have.key("StackPolicyBody")
@mock_cloudformation
def test_set_stack_policy_with_both_body_and_url():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json)
with pytest.raises(ClientError) as exc:
cf_conn.set_stack_policy(
StackName="test_stack", StackPolicyBody="{}", StackPolicyURL="..."
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationError")
err["Message"].should.equal(
"You cannot specify both StackPolicyURL and StackPolicyBody"
)
err["Type"].should.equal("Sender")
@mock_cloudformation
def test_set_stack_policy_with_body():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json)
policy = json.dumps({"policy": "yes"})
cf_conn.set_stack_policy(StackName="test_stack", StackPolicyBody=policy)
resp = cf_conn.get_stack_policy(StackName="test_stack")
resp.should.have.key("StackPolicyBody").equals(policy)
@mock_cloudformation
@mock_s3
def test_set_stack_policy_with_url():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json)
policy = json.dumps({"policy": "yes"})
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="foobar")
s3.put_object(Bucket="foobar", Key="test", Body=policy)
key_url = s3.generate_presigned_url(
ClientMethod="get_object", Params={"Bucket": "foobar", "Key": "test"}
)
cf_conn.set_stack_policy(StackName="test_stack", StackPolicyURL=key_url)
resp = cf_conn.get_stack_policy(StackName="test_stack")
resp.should.have.key("StackPolicyBody").equals(policy)
@mock_cloudformation
@mock_s3
def test_set_stack_policy_with_url_pointing_to_unknown_key():
cf_conn = boto3.client("cloudformation", region_name="us-east-1")
cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json)
with pytest.raises(ClientError) as exc:
cf_conn.set_stack_policy(StackName="test_stack", StackPolicyURL="...")
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationError")
err["Message"].should.contain("S3 error: Access Denied")
err["Type"].should.equal("Sender")