From d2eea0277487a254f21759ff766150f6caa42b1b Mon Sep 17 00:00:00 2001 From: Brian Cavagnolo Date: Fri, 8 Dec 2017 02:57:05 -0800 Subject: [PATCH] add support for kinesis AT_TIMESTAMP shard iterator (#1376) Fixes #813 --- moto/kinesis/models.py | 16 +++- moto/kinesis/responses.py | 3 +- moto/kinesis/utils.py | 5 +- tests/test_kinesis/test_kinesis.py | 125 +++++++++++++++++++++++++++++ 4 files changed, 145 insertions(+), 4 deletions(-) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index aae94bbbd..d9ea3b897 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -12,6 +12,7 @@ from hashlib import md5 from moto.compat import OrderedDict from moto.core import BaseBackend, BaseModel +from moto.core.utils import unix_time from .exceptions import StreamNotFoundError, ShardNotFoundError, ResourceInUseError, \ ResourceNotFoundError, InvalidArgumentError from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose_shard_iterator @@ -24,6 +25,7 @@ class Record(BaseModel): self.data = data self.sequence_number = sequence_number self.explicit_hash_key = explicit_hash_key + self.create_at = unix_time() def to_json(self): return { @@ -80,6 +82,15 @@ class Shard(BaseModel): return list(self.records.keys())[-1] return 0 + def get_sequence_number_at(self, at_timestamp): + if not self.records or at_timestamp < list(self.records.values())[0].create_at: + return 0 + else: + # find the last item in the list that was created before + # at_timestamp + r = next((r for r in reversed(self.records.values()) if r.create_at < at_timestamp), None) + return r.sequence_number + def to_json(self): return { "HashKeyRange": { @@ -300,13 +311,14 @@ class KinesisBackend(BaseBackend): return self.streams.pop(stream_name) raise StreamNotFoundError(stream_name) - def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number): + def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number, + at_timestamp): # Validate params stream = self.describe_stream(stream_name) shard = stream.get_shard(shard_id) shard_iterator = compose_new_shard_iterator( - stream_name, shard, shard_iterator_type, starting_sequence_number + stream_name, shard, shard_iterator_type, starting_sequence_number, at_timestamp ) return shard_iterator diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 1ac6cd756..b9b4883ef 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -66,9 +66,10 @@ class KinesisResponse(BaseResponse): shard_iterator_type = self.parameters.get("ShardIteratorType") starting_sequence_number = self.parameters.get( "StartingSequenceNumber") + at_timestamp = self.parameters.get("Timestamp") shard_iterator = self.kinesis_backend.get_shard_iterator( - stream_name, shard_id, shard_iterator_type, starting_sequence_number, + stream_name, shard_id, shard_iterator_type, starting_sequence_number, at_timestamp ) return json.dumps({ diff --git a/moto/kinesis/utils.py b/moto/kinesis/utils.py index 190371b2e..337728f02 100644 --- a/moto/kinesis/utils.py +++ b/moto/kinesis/utils.py @@ -3,7 +3,8 @@ import base64 from .exceptions import InvalidArgumentError -def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number): +def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number, + at_timestamp): if shard_iterator_type == "AT_SEQUENCE_NUMBER": last_sequence_id = int(starting_sequence_number) - 1 elif shard_iterator_type == "AFTER_SEQUENCE_NUMBER": @@ -12,6 +13,8 @@ def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting last_sequence_id = 0 elif shard_iterator_type == "LATEST": last_sequence_id = shard.get_max_sequence_number() + elif shard_iterator_type == "AT_TIMESTAMP": + last_sequence_id = shard.get_sequence_number_at(at_timestamp) else: raise InvalidArgumentError( "Invalid ShardIteratorType: {0}".format(shard_iterator_type)) diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index 26a87f35a..e3d350023 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -4,6 +4,8 @@ import boto.kinesis from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException import boto3 import sure # noqa +import datetime +import time from moto import mock_kinesis, mock_kinesis_deprecated @@ -262,6 +264,129 @@ def test_get_records_latest(): response['Records'][0]['Data'].should.equal('last_record') +@mock_kinesis +def test_get_records_at_timestamp(): + # AT_TIMESTAMP - Read the first record at or after the specified timestamp + conn = boto3.client('kinesis', region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + # Create some data + for index in range(1, 5): + conn.put_record(StreamName=stream_name, + Data=str(index), + PartitionKey=str(index)) + + # When boto3 floors the timestamp that we pass to get_shard_iterator to + # second precision even though AWS supports ms precision: + # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html + # To test around this limitation we wait until we well into the next second + # before capturing the time and storing the records we expect to retrieve. + time.sleep(1.0) + timestamp = datetime.datetime.utcnow() + + keys = [str(i) for i in range(5, 10)] + for k in keys: + conn.put_record(StreamName=stream_name, + Data=k, + PartitionKey=k) + + # Get a shard iterator + response = conn.describe_stream(StreamName=stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType='AT_TIMESTAMP', + Timestamp=timestamp) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(ShardIterator=shard_iterator) + + response['Records'].should.have.length_of(len(keys)) + partition_keys = [r['PartitionKey'] for r in response['Records']] + partition_keys.should.equal(keys) + + +@mock_kinesis +def test_get_records_at_very_old_timestamp(): + conn = boto3.client('kinesis', region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + # Create some data + keys = [str(i) for i in range(1, 5)] + for k in keys: + conn.put_record(StreamName=stream_name, + Data=k, + PartitionKey=k) + + # Get a shard iterator + response = conn.describe_stream(StreamName=stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType='AT_TIMESTAMP', + Timestamp=1) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(ShardIterator=shard_iterator) + + response['Records'].should.have.length_of(len(keys)) + partition_keys = [r['PartitionKey'] for r in response['Records']] + partition_keys.should.equal(keys) + + +@mock_kinesis +def test_get_records_at_very_new_timestamp(): + conn = boto3.client('kinesis', region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + # Create some data + keys = [str(i) for i in range(1, 5)] + for k in keys: + conn.put_record(StreamName=stream_name, + Data=k, + PartitionKey=k) + + timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=1) + + # Get a shard iterator + response = conn.describe_stream(StreamName=stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType='AT_TIMESTAMP', + Timestamp=timestamp) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(ShardIterator=shard_iterator) + + response['Records'].should.have.length_of(0) + + +@mock_kinesis +def test_get_records_from_empty_stream_at_timestamp(): + conn = boto3.client('kinesis', region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + timestamp = datetime.datetime.utcnow() + + # Get a shard iterator + response = conn.describe_stream(StreamName=stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType='AT_TIMESTAMP', + Timestamp=timestamp) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(ShardIterator=shard_iterator) + + response['Records'].should.have.length_of(0) + + @mock_kinesis_deprecated def test_invalid_shard_iterator_type(): conn = boto.kinesis.connect_to_region("us-west-2")