Fix: CloudFormation support status filter in list stacks (#3115)

* Fix: CloudFormation support status filter in list stacks

* Added test for non decorator

Co-authored-by: usmankb <usman@krazybee.com>
This commit is contained in:
usmangani1 2020-07-12 18:09:42 +05:30 committed by GitHub
parent 069c159492
commit c5de56ce70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 3 deletions

View File

@ -449,6 +449,16 @@ class FakeEvent(BaseModel):
self.event_id = uuid.uuid4() self.event_id = uuid.uuid4()
def filter_stacks(all_stacks, status_filter):
filtered_stacks = []
if not status_filter:
return all_stacks
for stack in all_stacks:
if stack.status in status_filter:
filtered_stacks.append(stack)
return filtered_stacks
class CloudFormationBackend(BaseBackend): class CloudFormationBackend(BaseBackend):
def __init__(self): def __init__(self):
self.stacks = OrderedDict() self.stacks = OrderedDict()
@ -681,10 +691,11 @@ class CloudFormationBackend(BaseBackend):
def list_change_sets(self): def list_change_sets(self):
return self.change_sets.values() return self.change_sets.values()
def list_stacks(self): def list_stacks(self, status_filter=None):
return [v for v in self.stacks.values()] + [ total_stacks = [v for v in self.stacks.values()] + [
v for v in self.deleted_stacks.values() v for v in self.deleted_stacks.values()
] ]
return filter_stacks(total_stacks, status_filter)
def get_stack(self, name_or_stack_id): def get_stack(self, name_or_stack_id):
all_stacks = dict(self.deleted_stacks, **self.stacks) all_stacks = dict(self.deleted_stacks, **self.stacks)

View File

@ -233,7 +233,8 @@ class CloudFormationResponse(BaseResponse):
return template.render(change_sets=change_sets) return template.render(change_sets=change_sets)
def list_stacks(self): def list_stacks(self):
stacks = self.cloudformation_backend.list_stacks() status_filter = self._get_multi_param("StackStatusFilter.member")
stacks = self.cloudformation_backend.list_stacks(status_filter)
template = self.response_template(LIST_STACKS_RESPONSE) template = self.response_template(LIST_STACKS_RESPONSE)
return template.render(stacks=stacks) return template.render(stacks=stacks)

View File

@ -233,6 +233,19 @@ def test_list_stacks():
stacks[0].template_description.should.equal("Stack 1") stacks[0].template_description.should.equal("Stack 1")
@mock_cloudformation_deprecated
def test_list_stacks_with_filter():
conn = boto.connect_cloudformation()
conn.create_stack("test_stack", template_body=dummy_template_json)
conn.create_stack("test_stack2", template_body=dummy_template_json)
conn.update_stack("test_stack", template_body=dummy_template_json2)
stacks = conn.list_stacks("CREATE_COMPLETE")
stacks.should.have.length_of(1)
stacks[0].template_description.should.equal("Stack 1")
stacks = conn.list_stacks("UPDATE_COMPLETE")
stacks.should.have.length_of(1)
@mock_cloudformation_deprecated @mock_cloudformation_deprecated
def test_delete_stack_by_name(): def test_delete_stack_by_name():
conn = boto.connect_cloudformation() conn = boto.connect_cloudformation()

View File

@ -14,6 +14,7 @@ from nose.tools import assert_raises
from moto import mock_cloudformation, mock_s3, mock_sqs, mock_ec2 from moto import mock_cloudformation, mock_s3, mock_sqs, mock_ec2
from moto.core import ACCOUNT_ID from moto.core import ACCOUNT_ID
from .test_cloudformation_stack_crud import dummy_template_json2
dummy_template = { dummy_template = {
"AWSTemplateFormatVersion": "2010-09-09", "AWSTemplateFormatVersion": "2010-09-09",
@ -218,6 +219,18 @@ def test_boto3_list_stacksets_length():
stacksets.should.have.length_of(2) stacksets.should.have.length_of(2)
@mock_cloudformation
def test_boto3_filter_stacks():
conn = boto3.client("cloudformation", region_name="us-east-1")
conn.create_stack(StackName="test_stack", TemplateBody=dummy_template_json)
conn.create_stack(StackName="test_stack2", TemplateBody=dummy_template_json)
conn.update_stack(StackName="test_stack", TemplateBody=dummy_template_json2)
stacks = conn.list_stacks(StackStatusFilter=["CREATE_COMPLETE"])
stacks.get("StackSummaries").should.have.length_of(1)
stacks = conn.list_stacks(StackStatusFilter=["UPDATE_COMPLETE"])
stacks.get("StackSummaries").should.have.length_of(1)
@mock_cloudformation @mock_cloudformation
def test_boto3_list_stacksets_contents(): def test_boto3_list_stacksets_contents():
cf_conn = boto3.client("cloudformation", region_name="us-east-1") cf_conn = boto3.client("cloudformation", region_name="us-east-1")