diff --git a/moto/cloudformation/models.py b/moto/cloudformation/models.py index be02b0229..9ca85dff8 100644 --- a/moto/cloudformation/models.py +++ b/moto/cloudformation/models.py @@ -234,6 +234,7 @@ class FakeStack(BaseModel): self.role_arn = role_arn self.tags = tags if tags else {} self.events = [] + self.policy = "" self.cross_stack_resources = cross_stack_resources or {} self.resource_map = self._create_resource_map() @@ -828,6 +829,23 @@ class CloudFormationBackend(BaseBackend): stack.update(template, role_arn, parameters=resolved_parameters, tags=tags) return stack + def get_stack_policy(self, stack_name): + try: + stack = self.get_stack(stack_name) + except ValidationError: + raise ValidationError(message=f"Stack: {stack_name} does not exist") + return stack.policy + + def set_stack_policy(self, stack_name, policy_body): + """ + Note that Moto does no validation/parsing/enforcement of this policy - we simply persist it. + """ + try: + stack = self.get_stack(stack_name) + except ValidationError: + raise ValidationError(message=f"Stack: {stack_name} does not exist") + stack.policy = policy_body + def list_stack_resources(self, stack_name_or_id): stack = self.get_stack(stack_name_or_id) return stack.stack_resources diff --git a/moto/cloudformation/responses.py b/moto/cloudformation/responses.py index b225d59eb..028a5bb19 100644 --- a/moto/cloudformation/responses.py +++ b/moto/cloudformation/responses.py @@ -5,6 +5,7 @@ from urllib.parse import urlparse from moto.core.responses import BaseResponse from moto.core.utils import amzn_request_id from moto.s3 import s3_backend +from moto.s3.exceptions import S3ClientError from moto.core import ACCOUNT_ID from .models import cloudformation_backends from .exceptions import ValidationError, MissingParameterError @@ -632,6 +633,32 @@ class CloudFormationResponse(BaseResponse): template = self.response_template(UPDATE_STACK_INSTANCES_RESPONSE_TEMPLATE) return template.render(operation=operation) + def get_stack_policy(self): + stack_name = self._get_param("StackName") + policy = self.cloudformation_backend.get_stack_policy(stack_name) + template = self.response_template(GET_STACK_POLICY_RESPONSE) + return template.render(policy=policy) + + def set_stack_policy(self): + stack_name = self._get_param("StackName") + policy_url = self._get_param("StackPolicyURL") + policy_body = self._get_param("StackPolicyBody") + if policy_body and policy_url: + raise ValidationError( + message="You cannot specify both StackPolicyURL and StackPolicyBody" + ) + if policy_url: + try: + policy_body = self._get_stack_from_s3_url(policy_url) + except S3ClientError as s3_e: + raise ValidationError( + message=f"S3 error: Access Denied: {s3_e.error_type}" + ) + self.cloudformation_backend.set_stack_policy( + stack_name, policy_body=policy_body + ) + return SET_STACK_POLICY_RESPONSE + VALIDATE_STACK_RESPONSE_TEMPLATE = """ @@ -1244,3 +1271,21 @@ GET_TEMPLATE_SUMMARY_TEMPLATE = """ + + abe48993-e23f-4167-b703-5b0f1b6aa84f + +""" + + +GET_STACK_POLICY_RESPONSE = """ + + {% if policy %} + {{ policy }} + {% endif %} + + + e9e39eb6-1c05-4f0e-958a-b63f420e0a07 + +""" diff --git a/tests/test_cloudformation/test_cloudformation_stack_policies.py b/tests/test_cloudformation/test_cloudformation_stack_policies.py new file mode 100644 index 000000000..c5369fa36 --- /dev/null +++ b/tests/test_cloudformation/test_cloudformation_stack_policies.py @@ -0,0 +1,105 @@ +import boto3 +import json + +import pytest + +from botocore.exceptions import ClientError +from moto import mock_cloudformation, mock_s3 +from .test_cloudformation_stack_crud_boto3 import dummy_template_json + + +@mock_cloudformation +def test_set_stack_policy_on_nonexisting_stack(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + + with pytest.raises(ClientError) as exc: + cf_conn.set_stack_policy(StackName="unknown", StackPolicyBody="{}") + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationError") + err["Message"].should.equal("Stack: unknown does not exist") + err["Type"].should.equal("Sender") + + +@mock_cloudformation +def test_get_stack_policy_on_nonexisting_stack(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + + with pytest.raises(ClientError) as exc: + cf_conn.get_stack_policy(StackName="unknown") + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationError") + err["Message"].should.equal("Stack: unknown does not exist") + err["Type"].should.equal("Sender") + + +@mock_cloudformation +def test_get_stack_policy_on_stack_without_policy(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json) + + resp = cf_conn.get_stack_policy(StackName="test_stack") + resp.shouldnt.have.key("StackPolicyBody") + + +@mock_cloudformation +def test_set_stack_policy_with_both_body_and_url(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json) + + with pytest.raises(ClientError) as exc: + cf_conn.set_stack_policy( + StackName="test_stack", StackPolicyBody="{}", StackPolicyURL="..." + ) + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationError") + err["Message"].should.equal( + "You cannot specify both StackPolicyURL and StackPolicyBody" + ) + err["Type"].should.equal("Sender") + + +@mock_cloudformation +def test_set_stack_policy_with_body(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json) + + policy = json.dumps({"policy": "yes"}) + + cf_conn.set_stack_policy(StackName="test_stack", StackPolicyBody=policy) + + resp = cf_conn.get_stack_policy(StackName="test_stack") + resp.should.have.key("StackPolicyBody").equals(policy) + + +@mock_cloudformation +@mock_s3 +def test_set_stack_policy_with_url(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json) + + policy = json.dumps({"policy": "yes"}) + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="foobar") + s3.put_object(Bucket="foobar", Key="test", Body=policy) + key_url = s3.generate_presigned_url( + ClientMethod="get_object", Params={"Bucket": "foobar", "Key": "test"} + ) + + cf_conn.set_stack_policy(StackName="test_stack", StackPolicyURL=key_url) + + resp = cf_conn.get_stack_policy(StackName="test_stack") + resp.should.have.key("StackPolicyBody").equals(policy) + + +@mock_cloudformation +@mock_s3 +def test_set_stack_policy_with_url_pointing_to_unknown_key(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + cf_conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json) + + with pytest.raises(ClientError) as exc: + cf_conn.set_stack_policy(StackName="test_stack", StackPolicyURL="...") + err = exc.value.response["Error"] + err["Code"].should.equal("ValidationError") + err["Message"].should.contain("S3 error: Access Denied") + err["Type"].should.equal("Sender")