From a48c8110693fac3ce94e62fc99993cc09921c163 Mon Sep 17 00:00:00 2001 From: Shubham Gupta <14368181+goodhamgupta@users.noreply.github.com> Date: Wed, 14 Jul 2021 22:36:30 +0800 Subject: [PATCH] #4067: Add support for kinesis retention hours (#4068) --- IMPLEMENTATION_COVERAGE.md | 4 +- moto/kinesis/models.py | 24 ++++++++++- moto/kinesis/responses.py | 16 +++++++ tests/test_kinesis/test_kinesis.py | 68 ++++++++++++++++++++++++++++++ 4 files changed, 109 insertions(+), 3 deletions(-) diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index b120ee26d..7191fb020 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -6216,7 +6216,7 @@ - [X] add_tags_to_stream - [X] create_stream -- [ ] decrease_stream_retention_period +- [X] decrease_stream_retention_period - [X] delete_stream - [ ] deregister_stream_consumer - [ ] describe_limits @@ -6227,7 +6227,7 @@ - [ ] enable_enhanced_monitoring - [X] get_records - [X] get_shard_iterator -- [ ] increase_stream_retention_period +- [X] increase_stream_retention_period - [ ] list_shards - [ ] list_stream_consumers - [X] list_streams diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 3065b81af..dea436dfe 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -145,7 +145,9 @@ class Stream(CloudFormationModel): self.status = "ACTIVE" self.shard_count = None self.update_shard_count(shard_count) - self.retention_period_hours = retention_period_hours + self.retention_period_hours = ( + retention_period_hours if retention_period_hours else 24 + ) def update_shard_count(self, shard_count): # ToDo: This was extracted from init. It's only accurate for new streams. @@ -574,6 +576,26 @@ class KinesisBackend(BaseBackend): record.partition_key, record.data, record.explicit_hash_key ) + def increase_stream_retention_period(self, stream_name, retention_period_hours): + stream = self.describe_stream(stream_name) + if ( + retention_period_hours <= stream.retention_period_hours + or retention_period_hours < 24 + or retention_period_hours > 8760 + ): + raise InvalidArgumentError(retention_period_hours) + stream.retention_period_hours = retention_period_hours + + def decrease_stream_retention_period(self, stream_name, retention_period_hours): + stream = self.describe_stream(stream_name) + if ( + retention_period_hours >= stream.retention_period_hours + or retention_period_hours < 24 + or retention_period_hours > 8760 + ): + raise InvalidArgumentError(retention_period_hours) + stream.retention_period_hours = retention_period_hours + """ Firehose """ def create_delivery_stream(self, stream_name, **stream_kwargs): diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 8e7fc3941..dd30293e6 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -149,6 +149,22 @@ class KinesisResponse(BaseResponse): ) return "" + def increase_stream_retention_period(self): + stream_name = self.parameters.get("StreamName") + retention_period_hours = self.parameters.get("RetentionPeriodHours") + self.kinesis_backend.increase_stream_retention_period( + stream_name, retention_period_hours + ) + return "" + + def decrease_stream_retention_period(self): + stream_name = self.parameters.get("StreamName") + retention_period_hours = self.parameters.get("RetentionPeriodHours") + self.kinesis_backend.decrease_stream_retention_period( + stream_name, retention_period_hours + ) + return "" + """ Firehose """ def create_delivery_stream(self): diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index a2fc2f346..cd04afd56 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -2,10 +2,13 @@ from __future__ import unicode_literals import datetime import time +import pytest import boto.kinesis import boto3 from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException +from botocore.exceptions import ClientError + from dateutil.tz import tzlocal from moto import mock_kinesis, mock_kinesis_deprecated @@ -479,6 +482,71 @@ def test_get_records_from_empty_stream_at_timestamp(): response["MillisBehindLatest"].should.equal(0) +@mock_kinesis +def test_valid_increase_stream_retention_period(): + conn = boto3.client("kinesis", region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + conn.increase_stream_retention_period( + StreamName=stream_name, RetentionPeriodHours=40 + ) + + response = conn.describe_stream(StreamName=stream_name) + response["StreamDescription"]["RetentionPeriodHours"].should.equal(40) + + +@mock_kinesis +def test_invalid_increase_stream_retention_period(): + conn = boto3.client("kinesis", region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + conn.increase_stream_retention_period( + StreamName=stream_name, RetentionPeriodHours=30 + ) + with pytest.raises(ClientError) as ex: + conn.increase_stream_retention_period( + StreamName=stream_name, RetentionPeriodHours=20 + ) + ex.value.response["Error"]["Code"].should.equal("InvalidArgumentException") + ex.value.response["Error"]["Message"].should.equal(20) + + +@mock_kinesis +def test_valid_decrease_stream_retention_period(): + conn = boto3.client("kinesis", region_name="us-west-2") + stream_name = "decrease_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + conn.increase_stream_retention_period( + StreamName=stream_name, RetentionPeriodHours=30 + ) + conn.decrease_stream_retention_period( + StreamName=stream_name, RetentionPeriodHours=25 + ) + + response = conn.describe_stream(StreamName=stream_name) + response["StreamDescription"]["RetentionPeriodHours"].should.equal(25) + + +@mock_kinesis +def test_invalid_decrease_stream_retention_period(): + conn = boto3.client("kinesis", region_name="us-west-2") + stream_name = "decrease_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + conn.increase_stream_retention_period( + StreamName=stream_name, RetentionPeriodHours=30 + ) + with pytest.raises(ClientError) as ex: + conn.decrease_stream_retention_period( + StreamName=stream_name, RetentionPeriodHours=20 + ) + ex.value.response["Error"]["Code"].should.equal("InvalidArgumentException") + ex.value.response["Error"]["Message"].should.equal(20) + + @mock_kinesis_deprecated def test_invalid_shard_iterator_type(): conn = boto.kinesis.connect_to_region("us-west-2")