#4067: Add support for kinesis retention hours (#4068)

This commit is contained in:
Shubham Gupta 2021-07-14 22:36:30 +08:00 committed by GitHub
parent cb53f86c24
commit a48c811069
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 3 deletions

View File

@ -6216,7 +6216,7 @@
- [X] add_tags_to_stream - [X] add_tags_to_stream
- [X] create_stream - [X] create_stream
- [ ] decrease_stream_retention_period - [X] decrease_stream_retention_period
- [X] delete_stream - [X] delete_stream
- [ ] deregister_stream_consumer - [ ] deregister_stream_consumer
- [ ] describe_limits - [ ] describe_limits
@ -6227,7 +6227,7 @@
- [ ] enable_enhanced_monitoring - [ ] enable_enhanced_monitoring
- [X] get_records - [X] get_records
- [X] get_shard_iterator - [X] get_shard_iterator
- [ ] increase_stream_retention_period - [X] increase_stream_retention_period
- [ ] list_shards - [ ] list_shards
- [ ] list_stream_consumers - [ ] list_stream_consumers
- [X] list_streams - [X] list_streams

View File

@ -145,7 +145,9 @@ class Stream(CloudFormationModel):
self.status = "ACTIVE" self.status = "ACTIVE"
self.shard_count = None self.shard_count = None
self.update_shard_count(shard_count) self.update_shard_count(shard_count)
self.retention_period_hours = retention_period_hours self.retention_period_hours = (
retention_period_hours if retention_period_hours else 24
)
def update_shard_count(self, shard_count): def update_shard_count(self, shard_count):
# ToDo: This was extracted from init. It's only accurate for new streams. # ToDo: This was extracted from init. It's only accurate for new streams.
@ -574,6 +576,26 @@ class KinesisBackend(BaseBackend):
record.partition_key, record.data, record.explicit_hash_key record.partition_key, record.data, record.explicit_hash_key
) )
def increase_stream_retention_period(self, stream_name, retention_period_hours):
stream = self.describe_stream(stream_name)
if (
retention_period_hours <= stream.retention_period_hours
or retention_period_hours < 24
or retention_period_hours > 8760
):
raise InvalidArgumentError(retention_period_hours)
stream.retention_period_hours = retention_period_hours
def decrease_stream_retention_period(self, stream_name, retention_period_hours):
stream = self.describe_stream(stream_name)
if (
retention_period_hours >= stream.retention_period_hours
or retention_period_hours < 24
or retention_period_hours > 8760
):
raise InvalidArgumentError(retention_period_hours)
stream.retention_period_hours = retention_period_hours
""" Firehose """ """ Firehose """
def create_delivery_stream(self, stream_name, **stream_kwargs): def create_delivery_stream(self, stream_name, **stream_kwargs):

View File

@ -149,6 +149,22 @@ class KinesisResponse(BaseResponse):
) )
return "" return ""
def increase_stream_retention_period(self):
stream_name = self.parameters.get("StreamName")
retention_period_hours = self.parameters.get("RetentionPeriodHours")
self.kinesis_backend.increase_stream_retention_period(
stream_name, retention_period_hours
)
return ""
def decrease_stream_retention_period(self):
stream_name = self.parameters.get("StreamName")
retention_period_hours = self.parameters.get("RetentionPeriodHours")
self.kinesis_backend.decrease_stream_retention_period(
stream_name, retention_period_hours
)
return ""
""" Firehose """ """ Firehose """
def create_delivery_stream(self): def create_delivery_stream(self):

View File

@ -2,10 +2,13 @@ from __future__ import unicode_literals
import datetime import datetime
import time import time
import pytest
import boto.kinesis import boto.kinesis
import boto3 import boto3
from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException
from botocore.exceptions import ClientError
from dateutil.tz import tzlocal from dateutil.tz import tzlocal
from moto import mock_kinesis, mock_kinesis_deprecated from moto import mock_kinesis, mock_kinesis_deprecated
@ -479,6 +482,71 @@ def test_get_records_from_empty_stream_at_timestamp():
response["MillisBehindLatest"].should.equal(0) response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_valid_increase_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=40
)
response = conn.describe_stream(StreamName=stream_name)
response["StreamDescription"]["RetentionPeriodHours"].should.equal(40)
@mock_kinesis
def test_invalid_increase_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=30
)
with pytest.raises(ClientError) as ex:
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=20
)
ex.value.response["Error"]["Code"].should.equal("InvalidArgumentException")
ex.value.response["Error"]["Message"].should.equal(20)
@mock_kinesis
def test_valid_decrease_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "decrease_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=30
)
conn.decrease_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=25
)
response = conn.describe_stream(StreamName=stream_name)
response["StreamDescription"]["RetentionPeriodHours"].should.equal(25)
@mock_kinesis
def test_invalid_decrease_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "decrease_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=30
)
with pytest.raises(ClientError) as ex:
conn.decrease_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=20
)
ex.value.response["Error"]["Code"].should.equal("InvalidArgumentException")
ex.value.response["Error"]["Message"].should.equal(20)
@mock_kinesis_deprecated @mock_kinesis_deprecated
def test_invalid_shard_iterator_type(): def test_invalid_shard_iterator_type():
conn = boto.kinesis.connect_to_region("us-west-2") conn = boto.kinesis.connect_to_region("us-west-2")