From da07adae525a51849b5b18b6f840e979e2d364fc Mon Sep 17 00:00:00 2001 From: jweite Date: Mon, 3 Aug 2020 11:04:05 -0400 Subject: [PATCH] * Support for CloudFormation update and delete of Kinesis Streams (#3212) * Support for CloudFormation stack resource deletion via backend resource method delete_from_cloudformation_json() via parse_and_delete_resource(). * Correction to the inappropriate inclusion of EndingSequenceNumber in open shards. This attribute should only appear in closed shards. This regretfully prevents confirmation of consistent record counts after split/merge in unit tests. * Added parameters/decorator to CloudFormationModel method declarations to calm-down Pycharm. Co-authored-by: Joseph Weitekamp --- moto/cloudformation/parsing.py | 17 +++ moto/core/models.py | 18 ++- moto/kinesis/models.py | 89 +++++++++-- tests/test_kinesis/test_kinesis.py | 36 ++--- .../test_kinesis_cloudformation.py | 144 ++++++++++++++++++ 5 files changed, 268 insertions(+), 36 deletions(-) create mode 100644 tests/test_kinesis/test_kinesis_cloudformation.py diff --git a/moto/cloudformation/parsing.py b/moto/cloudformation/parsing.py index a1e1bb18b..272856367 100644 --- a/moto/cloudformation/parsing.py +++ b/moto/cloudformation/parsing.py @@ -649,6 +649,23 @@ class ResourceMap(collections_abc.Mapping): try: if parsed_resource and hasattr(parsed_resource, "delete"): parsed_resource.delete(self._region_name) + else: + resource_name_attribute = ( + parsed_resource.cloudformation_name_type() + if hasattr(parsed_resource, "cloudformation_name_type") + else resource_name_property_from_type(parsed_resource.type) + ) + if resource_name_attribute: + resource_json = self._resource_json_map[ + parsed_resource.logical_resource_id + ] + resource_name = resource_json["Properties"][ + resource_name_attribute + ] + parse_and_delete_resource( + resource_name, resource_json, self, self._region_name + ) + self._parsed_resources.pop(parsed_resource.logical_resource_id) except Exception as e: # skip over dependency violations, and try again in a # second pass diff --git a/moto/core/models.py b/moto/core/models.py index cf78be3f8..ae241322c 100644 --- a/moto/core/models.py +++ b/moto/core/models.py @@ -538,21 +538,25 @@ class BaseModel(object): # Parent class for every Model that can be instantiated by CloudFormation # On subclasses, implement the two methods as @staticmethod to ensure correct behaviour of the CF parser class CloudFormationModel(BaseModel): + @staticmethod @abstractmethod - def cloudformation_name_type(self): + def cloudformation_name_type(): # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-name.html # This must be implemented as a staticmethod with no parameters # Return None for resources that do not have a name property pass + @staticmethod @abstractmethod - def cloudformation_type(self): + def cloudformation_type(): # This must be implemented as a staticmethod with no parameters # See for example https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html return "AWS::SERVICE::RESOURCE" @abstractmethod - def create_from_cloudformation_json(self): + def create_from_cloudformation_json( + cls, resource_name, cloudformation_json, region_name + ): # This must be implemented as a classmethod with parameters: # cls, resource_name, cloudformation_json, region_name # Extract the resource parameters from the cloudformation json @@ -560,7 +564,9 @@ class CloudFormationModel(BaseModel): pass @abstractmethod - def update_from_cloudformation_json(self): + def update_from_cloudformation_json( + cls, original_resource, new_resource_name, cloudformation_json, region_name + ): # This must be implemented as a classmethod with parameters: # cls, original_resource, new_resource_name, cloudformation_json, region_name # Extract the resource parameters from the cloudformation json, @@ -569,7 +575,9 @@ class CloudFormationModel(BaseModel): pass @abstractmethod - def delete_from_cloudformation_json(self): + def delete_from_cloudformation_json( + cls, resource_name, cloudformation_json, region_name + ): # This must be implemented as a classmethod with parameters: # cls, resource_name, cloudformation_json, region_name # Extract the resource parameters from the cloudformation json diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index c4b04d924..a9c4f5476 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -53,6 +53,7 @@ class Shard(BaseModel): self.starting_hash = starting_hash self.ending_hash = ending_hash self.records = OrderedDict() + self.is_open = True @property def shard_id(self): @@ -116,29 +117,41 @@ class Shard(BaseModel): return r.sequence_number def to_json(self): - return { + response = { "HashKeyRange": { "EndingHashKey": str(self.ending_hash), "StartingHashKey": str(self.starting_hash), }, "SequenceNumberRange": { - "EndingSequenceNumber": self.get_max_sequence_number(), "StartingSequenceNumber": self.get_min_sequence_number(), }, "ShardId": self.shard_id, } + if not self.is_open: + response["SequenceNumberRange"][ + "EndingSequenceNumber" + ] = self.get_max_sequence_number() + return response class Stream(CloudFormationModel): - def __init__(self, stream_name, shard_count, region): + def __init__(self, stream_name, shard_count, region_name): self.stream_name = stream_name - self.shard_count = shard_count self.creation_datetime = datetime.datetime.now() - self.region = region + self.region = region_name self.account_number = ACCOUNT_ID self.shards = {} self.tags = {} self.status = "ACTIVE" + self.shard_count = None + self.update_shard_count(shard_count) + + def update_shard_count(self, shard_count): + # ToDo: This was extracted from init. It's only accurate for new streams. + # It doesn't (yet) try to accurately mimic the more complex re-sharding behavior. + # It makes the stream as if it had been created with this number of shards. + # Logically consistent, but not what AWS does. + self.shard_count = shard_count step = 2 ** 128 // shard_count hash_ranges = itertools.chain( @@ -146,7 +159,6 @@ class Stream(CloudFormationModel): [(shard_count - 1, (shard_count - 1) * step, 2 ** 128)], ) for index, start, end in hash_ranges: - shard = Shard(index, start, end) self.shards[shard.shard_id] = shard @@ -229,10 +241,65 @@ class Stream(CloudFormationModel): def create_from_cloudformation_json( cls, resource_name, cloudformation_json, region_name ): - properties = cloudformation_json["Properties"] - region = properties.get("Region", "us-east-1") + properties = cloudformation_json.get("Properties", {}) shard_count = properties.get("ShardCount", 1) - return Stream(properties["Name"], shard_count, region) + name = properties.get("Name", resource_name) + backend = kinesis_backends[region_name] + return backend.create_stream(name, shard_count, region_name) + + @classmethod + def update_from_cloudformation_json( + cls, original_resource, new_resource_name, cloudformation_json, region_name, + ): + properties = cloudformation_json["Properties"] + + if Stream.is_replacement_update(properties): + resource_name_property = cls.cloudformation_name_type() + if resource_name_property not in properties: + properties[resource_name_property] = new_resource_name + new_resource = cls.create_from_cloudformation_json( + properties[resource_name_property], cloudformation_json, region_name + ) + properties[resource_name_property] = original_resource.name + cls.delete_from_cloudformation_json( + original_resource.name, cloudformation_json, region_name + ) + return new_resource + + else: # No Interruption + if "ShardCount" in properties: + original_resource.update_shard_count(properties["ShardCount"]) + return original_resource + + @classmethod + def delete_from_cloudformation_json( + cls, resource_name, cloudformation_json, region_name + ): + backend = kinesis_backends[region_name] + properties = cloudformation_json.get("Properties", {}) + stream_name = properties.get(cls.cloudformation_name_type(), resource_name) + backend.delete_stream(stream_name) + + @staticmethod + def is_replacement_update(properties): + properties_requiring_replacement_update = ["BucketName", "ObjectLockEnabled"] + return any( + [ + property_requiring_replacement in properties + for property_requiring_replacement in properties_requiring_replacement_update + ] + ) + + def get_cfn_attribute(self, attribute_name): + from moto.cloudformation.exceptions import UnformattedGetAttTemplateException + + if attribute_name == "Arn": + return self.arn + raise UnformattedGetAttTemplateException() + + @property + def physical_resource_id(self): + return self.stream_name class FirehoseRecord(BaseModel): @@ -331,10 +398,10 @@ class KinesisBackend(BaseBackend): self.streams = OrderedDict() self.delivery_streams = {} - def create_stream(self, stream_name, shard_count, region): + def create_stream(self, stream_name, shard_count, region_name): if stream_name in self.streams: raise ResourceInUseError(stream_name) - stream = Stream(stream_name, shard_count, region) + stream = Stream(stream_name, shard_count, region_name) self.streams[stream_name] = stream return stream diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index b3251bb0f..85f248572 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -10,6 +10,8 @@ from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentEx from moto import mock_kinesis, mock_kinesis_deprecated from moto.core import ACCOUNT_ID +import sure # noqa + @mock_kinesis_deprecated def test_create_cluster(): @@ -601,9 +603,6 @@ def test_split_shard(): stream = stream_response["StreamDescription"] shards = stream["Shards"] shards.should.have.length_of(2) - sum( - [shard["SequenceNumberRange"]["EndingSequenceNumber"] for shard in shards] - ).should.equal(99) shard_range = shards[0]["HashKeyRange"] new_starting_hash = ( @@ -616,9 +615,6 @@ def test_split_shard(): stream = stream_response["StreamDescription"] shards = stream["Shards"] shards.should.have.length_of(3) - sum( - [shard["SequenceNumberRange"]["EndingSequenceNumber"] for shard in shards] - ).should.equal(99) shard_range = shards[2]["HashKeyRange"] new_starting_hash = ( @@ -631,9 +627,6 @@ def test_split_shard(): stream = stream_response["StreamDescription"] shards = stream["Shards"] shards.should.have.length_of(4) - sum( - [shard["SequenceNumberRange"]["EndingSequenceNumber"] for shard in shards] - ).should.equal(99) @mock_kinesis_deprecated @@ -662,9 +655,6 @@ def test_merge_shards(): stream = stream_response["StreamDescription"] shards = stream["Shards"] shards.should.have.length_of(4) - sum( - [shard["SequenceNumberRange"]["EndingSequenceNumber"] for shard in shards] - ).should.equal(99) conn.merge_shards(stream_name, "shardId-000000000000", "shardId-000000000001") @@ -672,17 +662,23 @@ def test_merge_shards(): stream = stream_response["StreamDescription"] shards = stream["Shards"] - shards.should.have.length_of(3) - sum( - [shard["SequenceNumberRange"]["EndingSequenceNumber"] for shard in shards] - ).should.equal(99) + active_shards = [ + shard + for shard in shards + if "EndingSequenceNumber" not in shard["SequenceNumberRange"] + ] + active_shards.should.have.length_of(3) + conn.merge_shards(stream_name, "shardId-000000000002", "shardId-000000000000") stream_response = conn.describe_stream(stream_name) stream = stream_response["StreamDescription"] shards = stream["Shards"] - shards.should.have.length_of(2) - sum( - [shard["SequenceNumberRange"]["EndingSequenceNumber"] for shard in shards] - ).should.equal(99) + active_shards = [ + shard + for shard in shards + if "EndingSequenceNumber" not in shard["SequenceNumberRange"] + ] + + active_shards.should.have.length_of(2) diff --git a/tests/test_kinesis/test_kinesis_cloudformation.py b/tests/test_kinesis/test_kinesis_cloudformation.py new file mode 100644 index 000000000..7f3aef0de --- /dev/null +++ b/tests/test_kinesis/test_kinesis_cloudformation.py @@ -0,0 +1,144 @@ +import boto3 +import sure # noqa + +from moto import mock_kinesis, mock_cloudformation + + +@mock_cloudformation +def test_kinesis_cloudformation_create_stream(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + + stack_name = "MyStack" + + template = '{"Resources":{"MyStream":{"Type":"AWS::Kinesis::Stream"}}}' + + cf_conn.create_stack(StackName=stack_name, TemplateBody=template) + + provisioned_resource = cf_conn.list_stack_resources(StackName=stack_name)[ + "StackResourceSummaries" + ][0] + provisioned_resource["LogicalResourceId"].should.equal("MyStream") + len(provisioned_resource["PhysicalResourceId"]).should.be.greater_than(0) + + +@mock_cloudformation +@mock_kinesis +def test_kinesis_cloudformation_get_attr(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + + stack_name = "MyStack" + + template = """ +Resources: + TheStream: + Type: AWS::Kinesis::Stream +Outputs: + StreamName: + Value: !Ref TheStream + StreamArn: + Value: !GetAtt TheStream.Arn +""".strip() + + cf_conn.create_stack(StackName=stack_name, TemplateBody=template) + stack_description = cf_conn.describe_stacks(StackName=stack_name)["Stacks"][0] + output_stream_name = [ + output["OutputValue"] + for output in stack_description["Outputs"] + if output["OutputKey"] == "StreamName" + ][0] + output_stream_arn = [ + output["OutputValue"] + for output in stack_description["Outputs"] + if output["OutputKey"] == "StreamArn" + ][0] + + kinesis_conn = boto3.client("kinesis", region_name="us-east-1") + stream_description = kinesis_conn.describe_stream(StreamName=output_stream_name)[ + "StreamDescription" + ] + output_stream_arn.should.equal(stream_description["StreamARN"]) + + +@mock_cloudformation +@mock_kinesis +def test_kinesis_cloudformation_update(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + + stack_name = "MyStack" + + template = """ +Resources: + TheStream: + Type: AWS::Kinesis::Stream + Properties: + Name: MyStream + ShardCount: 4 +""".strip() + + cf_conn.create_stack(StackName=stack_name, TemplateBody=template) + stack_description = cf_conn.describe_stacks(StackName=stack_name)["Stacks"][0] + stack_description["StackName"].should.equal(stack_name) + + kinesis_conn = boto3.client("kinesis", region_name="us-east-1") + stream_description = kinesis_conn.describe_stream(StreamName="MyStream")[ + "StreamDescription" + ] + shards_provisioned = len( + [ + shard + for shard in stream_description["Shards"] + if "EndingSequenceNumber" not in shard["SequenceNumberRange"] + ] + ) + shards_provisioned.should.equal(4) + + template = """ + Resources: + TheStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 6 + """.strip() + cf_conn.update_stack(StackName=stack_name, TemplateBody=template) + + stream_description = kinesis_conn.describe_stream(StreamName="MyStream")[ + "StreamDescription" + ] + shards_provisioned = len( + [ + shard + for shard in stream_description["Shards"] + if "EndingSequenceNumber" not in shard["SequenceNumberRange"] + ] + ) + shards_provisioned.should.equal(6) + + +@mock_cloudformation +@mock_kinesis +def test_kinesis_cloudformation_delete(): + cf_conn = boto3.client("cloudformation", region_name="us-east-1") + + stack_name = "MyStack" + + template = """ +Resources: + TheStream: + Type: AWS::Kinesis::Stream + Properties: + Name: MyStream +""".strip() + + cf_conn.create_stack(StackName=stack_name, TemplateBody=template) + stack_description = cf_conn.describe_stacks(StackName=stack_name)["Stacks"][0] + stack_description["StackName"].should.equal(stack_name) + + kinesis_conn = boto3.client("kinesis", region_name="us-east-1") + stream_description = kinesis_conn.describe_stream(StreamName="MyStream")[ + "StreamDescription" + ] + stream_description["StreamName"].should.equal("MyStream") + + cf_conn.delete_stack(StackName=stack_name) + streams = kinesis_conn.list_streams()["StreamNames"] + len(streams).should.equal(0)