add support for kinesis AT_TIMESTAMP shard iterator (#1376)

Fixes #813
This commit is contained in:
Brian Cavagnolo 2017-12-08 02:57:05 -08:00 committed by Terry Cain
parent 92f5f7b263
commit d2eea02774
4 changed files with 145 additions and 4 deletions

View File

@ -12,6 +12,7 @@ from hashlib import md5
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
from moto.core.utils import unix_time
from .exceptions import StreamNotFoundError, ShardNotFoundError, ResourceInUseError, \
ResourceNotFoundError, InvalidArgumentError
from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose_shard_iterator
@ -24,6 +25,7 @@ class Record(BaseModel):
self.data = data
self.sequence_number = sequence_number
self.explicit_hash_key = explicit_hash_key
self.create_at = unix_time()
def to_json(self):
return {
@ -80,6 +82,15 @@ class Shard(BaseModel):
return list(self.records.keys())[-1]
return 0
def get_sequence_number_at(self, at_timestamp):
if not self.records or at_timestamp < list(self.records.values())[0].create_at:
return 0
else:
# find the last item in the list that was created before
# at_timestamp
r = next((r for r in reversed(self.records.values()) if r.create_at < at_timestamp), None)
return r.sequence_number
def to_json(self):
return {
"HashKeyRange": {
@ -300,13 +311,14 @@ class KinesisBackend(BaseBackend):
return self.streams.pop(stream_name)
raise StreamNotFoundError(stream_name)
def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number):
def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number,
at_timestamp):
# Validate params
stream = self.describe_stream(stream_name)
shard = stream.get_shard(shard_id)
shard_iterator = compose_new_shard_iterator(
stream_name, shard, shard_iterator_type, starting_sequence_number
stream_name, shard, shard_iterator_type, starting_sequence_number, at_timestamp
)
return shard_iterator

View File

@ -66,9 +66,10 @@ class KinesisResponse(BaseResponse):
shard_iterator_type = self.parameters.get("ShardIteratorType")
starting_sequence_number = self.parameters.get(
"StartingSequenceNumber")
at_timestamp = self.parameters.get("Timestamp")
shard_iterator = self.kinesis_backend.get_shard_iterator(
stream_name, shard_id, shard_iterator_type, starting_sequence_number,
stream_name, shard_id, shard_iterator_type, starting_sequence_number, at_timestamp
)
return json.dumps({

View File

@ -3,7 +3,8 @@ import base64
from .exceptions import InvalidArgumentError
def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number):
def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number,
at_timestamp):
if shard_iterator_type == "AT_SEQUENCE_NUMBER":
last_sequence_id = int(starting_sequence_number) - 1
elif shard_iterator_type == "AFTER_SEQUENCE_NUMBER":
@ -12,6 +13,8 @@ def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting
last_sequence_id = 0
elif shard_iterator_type == "LATEST":
last_sequence_id = shard.get_max_sequence_number()
elif shard_iterator_type == "AT_TIMESTAMP":
last_sequence_id = shard.get_sequence_number_at(at_timestamp)
else:
raise InvalidArgumentError(
"Invalid ShardIteratorType: {0}".format(shard_iterator_type))

View File

@ -4,6 +4,8 @@ import boto.kinesis
from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException
import boto3
import sure # noqa
import datetime
import time
from moto import mock_kinesis, mock_kinesis_deprecated
@ -262,6 +264,129 @@ def test_get_records_latest():
response['Records'][0]['Data'].should.equal('last_record')
@mock_kinesis
def test_get_records_at_timestamp():
# AT_TIMESTAMP - Read the first record at or after the specified timestamp
conn = boto3.client('kinesis', region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
# Create some data
for index in range(1, 5):
conn.put_record(StreamName=stream_name,
Data=str(index),
PartitionKey=str(index))
# When boto3 floors the timestamp that we pass to get_shard_iterator to
# second precision even though AWS supports ms precision:
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
# To test around this limitation we wait until we well into the next second
# before capturing the time and storing the records we expect to retrieve.
time.sleep(1.0)
timestamp = datetime.datetime.utcnow()
keys = [str(i) for i in range(5, 10)]
for k in keys:
conn.put_record(StreamName=stream_name,
Data=k,
PartitionKey=k)
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=timestamp)
shard_iterator = response['ShardIterator']
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(len(keys))
partition_keys = [r['PartitionKey'] for r in response['Records']]
partition_keys.should.equal(keys)
@mock_kinesis
def test_get_records_at_very_old_timestamp():
conn = boto3.client('kinesis', region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
# Create some data
keys = [str(i) for i in range(1, 5)]
for k in keys:
conn.put_record(StreamName=stream_name,
Data=k,
PartitionKey=k)
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=1)
shard_iterator = response['ShardIterator']
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(len(keys))
partition_keys = [r['PartitionKey'] for r in response['Records']]
partition_keys.should.equal(keys)
@mock_kinesis
def test_get_records_at_very_new_timestamp():
conn = boto3.client('kinesis', region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
# Create some data
keys = [str(i) for i in range(1, 5)]
for k in keys:
conn.put_record(StreamName=stream_name,
Data=k,
PartitionKey=k)
timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=timestamp)
shard_iterator = response['ShardIterator']
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(0)
@mock_kinesis
def test_get_records_from_empty_stream_at_timestamp():
conn = boto3.client('kinesis', region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
timestamp = datetime.datetime.utcnow()
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=timestamp)
shard_iterator = response['ShardIterator']
response = conn.get_records(ShardIterator=shard_iterator)
response['Records'].should.have.length_of(0)
@mock_kinesis_deprecated
def test_invalid_shard_iterator_type():
conn = boto.kinesis.connect_to_region("us-west-2")