From 57d45aa4b8f1e4cc589e9a8954342881cd26f47d Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 26 Nov 2014 20:49:21 -0500 Subject: [PATCH] Add more shard iterator types. --- moto/kinesis/exceptions.py | 15 +++ moto/kinesis/models.py | 144 +++++++++++++++++---- moto/kinesis/responses.py | 41 ++++++ moto/kinesis/utils.py | 31 +++++ tests/test_kinesis/test_kinesis.py | 193 ++++++++++++++++++++++++++++- 5 files changed, 402 insertions(+), 22 deletions(-) create mode 100644 moto/kinesis/utils.py diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py index 4ba4c8bf8..c6be76885 100644 --- a/moto/kinesis/exceptions.py +++ b/moto/kinesis/exceptions.py @@ -17,3 +17,18 @@ class StreamNotFoundError(ResourceNotFoundError): def __init__(self, stream_name): super(StreamNotFoundError, self).__init__( 'Stream {0} under account 123456789012 not found.'.format(stream_name)) + + +class ShardNotFoundError(ResourceNotFoundError): + def __init__(self, shard_id): + super(ShardNotFoundError, self).__init__( + 'Shard {0} under account 123456789012 not found.'.format(shard_id)) + + +class InvalidArgumentError(BadRequest): + def __init__(self, message): + super(InvalidArgumentError, self).__init__() + self.description = json.dumps({ + "message": message, + '__type': 'InvalidArgumentException', + }) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 04fb0550d..0a3eef3cd 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -2,7 +2,74 @@ from __future__ import unicode_literals import boto.kinesis from moto.core import BaseBackend -from .exceptions import StreamNotFoundError +from .exceptions import StreamNotFoundError, ShardNotFoundError +from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose_shard_iterator + +try: + from collections import OrderedDict +except ImportError: + # python 2.6 or earlier, use backport + from ordereddict import OrderedDict + + +class Record(object): + def __init__(self, partition_key, data, sequence_number): + self.partition_key = partition_key + self.data = data + self.sequence_number = sequence_number + + def to_json(self): + return { + "Data": self.data, + "PartitionKey": self.partition_key, + "SequenceNumber": str(self.sequence_number), + } + + +class Shard(object): + def __init__(self, shard_id): + self.shard_id = shard_id + self.records = OrderedDict() + + def get_records(self, last_sequence_id, limit): + last_sequence_id = int(last_sequence_id) + results = [] + + for sequence_number, record in self.records.items(): + if sequence_number > last_sequence_id: + results.append(record) + last_sequence_id = sequence_number + + if len(results) == limit: + break + + return results, last_sequence_id + + def put_record(self, partition_key, data): + # Note: this function is not safe for concurrency + if self.records: + last_sequence_number = self.get_max_sequence_number() + else: + last_sequence_number = 0 + sequence_number = last_sequence_number + 1 + self.records[sequence_number] = Record(partition_key, data, sequence_number) + return sequence_number + + def get_max_sequence_number(self): + return self.records.keys()[-1] + + def to_json(self): + return { + "HashKeyRange": { + "EndingHashKey": "113427455640312821154458202477256070484", + "StartingHashKey": "0" + }, + "SequenceNumberRange": { + "EndingSequenceNumber": "21269319989741826081360214168359141376", + "StartingSequenceNumber": "21267647932558653966460912964485513216" + }, + "ShardId": self.shard_id + } class Stream(object): @@ -11,6 +78,11 @@ class Stream(object): self.shard_count = shard_count self.region = region self.account_number = "123456789012" + self.shards = {} + + for index in range(shard_count): + shard_id = "shardId-{}".format(str(index).zfill(12)) + self.shards[shard_id] = Shard(shard_id) @property def arn(self): @@ -20,6 +92,24 @@ class Stream(object): stream_name=self.stream_name ) + def get_shard(self, shard_id): + if shard_id in self.shards: + return self.shards[shard_id] + else: + raise ShardNotFoundError(shard_id) + + def get_shard_for_key(self, partition_key): + # TODO implement sharding + shard = self.shards.values()[0] + return shard + + def put_record(self, partition_key, explicit_hash_key, sequence_number_for_ordering, data): + partition_key = explicit_hash_key if explicit_hash_key else partition_key + shard = self.get_shard_for_key(partition_key) + + sequence_number = shard.put_record(partition_key, data) + return sequence_number, shard.shard_id + def to_json(self): return { "StreamDescription": { @@ -27,26 +117,7 @@ class Stream(object): "StreamName": self.stream_name, "StreamStatus": "ACTIVE", "HasMoreShards": False, - "Shards": [{ - "HashKeyRange": { - "EndingHashKey": "113427455640312821154458202477256070484", - "StartingHashKey": "0" - }, - "SequenceNumberRange": { - "EndingSequenceNumber": "21269319989741826081360214168359141376", - "StartingSequenceNumber": "21267647932558653966460912964485513216" - }, - "ShardId": "shardId-000000000000" - }, { - "HashKeyRange": { - "EndingHashKey": "226854911280625642308916404954512140969", - "StartingHashKey": "113427455640312821154458202477256070485" - }, - "SequenceNumberRange": { - "StartingSequenceNumber": "21267647932558653966460912964485513217" - }, - "ShardId": "shardId-000000000001" - }], + "Shards": [shard.to_json() for shard in self.shards.values()], } } @@ -75,6 +146,37 @@ class KinesisBackend(BaseBackend): return self.streams.pop(stream_name) raise StreamNotFoundError(stream_name) + def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number): + # Validate params + stream = self.describe_stream(stream_name) + shard = stream.get_shard(shard_id) + + shard_iterator = compose_new_shard_iterator( + stream_name, shard, shard_iterator_type, starting_sequence_number + ) + return shard_iterator + + def get_records(self, shard_iterator, limit): + decomposed = decompose_shard_iterator(shard_iterator) + stream_name, shard_id, last_sequence_id = decomposed + + stream = self.describe_stream(stream_name) + shard = stream.get_shard(shard_id) + + records, last_sequence_id = shard.get_records(last_sequence_id, limit) + + next_shard_iterator = compose_shard_iterator(stream_name, shard, last_sequence_id) + + return next_shard_iterator, records + + def put_record(self, stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data): + stream = self.describe_stream(stream_name) + + sequence_number, shard_id = stream.put_record( + partition_key, explicit_hash_key, sequence_number_for_ordering, data + ) + + return sequence_number, shard_id kinesis_backends = {} for region in boto.kinesis.regions(): diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 43ed8b06a..4b5f13729 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -40,3 +40,44 @@ class KinesisResponse(BaseResponse): self.kinesis_backend.delete_stream(stream_name) return "" + + def get_shard_iterator(self): + stream_name = self.parameters.get("StreamName") + shard_id = self.parameters.get("ShardId") + shard_iterator_type = self.parameters.get("ShardIteratorType") + starting_sequence_number = self.parameters.get("StartingSequenceNumber") + + shard_iterator = self.kinesis_backend.get_shard_iterator( + stream_name, shard_id, shard_iterator_type, starting_sequence_number, + ) + + return json.dumps({ + "ShardIterator": shard_iterator + }) + + def get_records(self): + shard_iterator = self.parameters.get("ShardIterator") + limit = self.parameters.get("Limit") + + next_shard_iterator, records = self.kinesis_backend.get_records(shard_iterator, limit) + + return json.dumps({ + "NextShardIterator": next_shard_iterator, + "Records": [record.to_json() for record in records] + }) + + def put_record(self): + stream_name = self.parameters.get("StreamName") + partition_key = self.parameters.get("PartitionKey") + explicit_hash_key = self.parameters.get("ExplicitHashKey") + sequence_number_for_ordering = self.parameters.get("SequenceNumberForOrdering") + data = self.parameters.get("Data") + + sequence_number, shard_id = self.kinesis_backend.put_record( + stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data + ) + + return json.dumps({ + "SequenceNumber": sequence_number, + "ShardId": shard_id, + }) diff --git a/moto/kinesis/utils.py b/moto/kinesis/utils.py new file mode 100644 index 000000000..c998fe295 --- /dev/null +++ b/moto/kinesis/utils.py @@ -0,0 +1,31 @@ +import base64 + +from .exceptions import InvalidArgumentError + + +def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number): + if shard_iterator_type == "AT_SEQUENCE_NUMBER": + last_sequence_id = int(starting_sequence_number) - 1 + elif shard_iterator_type == "AFTER_SEQUENCE_NUMBER": + last_sequence_id = int(starting_sequence_number) + elif shard_iterator_type == "TRIM_HORIZON": + last_sequence_id = 0 + elif shard_iterator_type == "LATEST": + last_sequence_id = shard.get_max_sequence_number() + else: + raise InvalidArgumentError("Invalid ShardIteratorType: {0}".format(shard_iterator_type)) + return compose_shard_iterator(stream_name, shard, last_sequence_id) + + +def compose_shard_iterator(stream_name, shard, last_sequence_id): + return base64.encodestring( + "{0}:{1}:{2}".format( + stream_name, + shard.shard_id, + last_sequence_id, + ) + ) + + +def decompose_shard_iterator(shard_iterator): + return base64.decodestring(shard_iterator).split(":") diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index 8272ec75e..b85dc1dbd 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -1,7 +1,7 @@ from __future__ import unicode_literals import boto.kinesis -from boto.kinesis.exceptions import ResourceNotFoundException +from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException import sure # noqa from moto import mock_kinesis @@ -46,3 +46,194 @@ def test_list_and_delete_stream(): # Delete invalid id conn.delete_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException) + + +@mock_kinesis +def test_basic_shard_iterator(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + shard_iterator = response['NextShardIterator'] + response['Records'].should.equal([]) + + +@mock_kinesis +def test_get_invalid_shard_iterator(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + conn.get_shard_iterator.when.called_with(stream_name, "123", 'TRIM_HORIZON').should.throw(ResourceNotFoundException) + + +@mock_kinesis +def test_put_records(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + data = "hello world" + partition_key = "1234" + conn.put_record(stream_name, data, partition_key) + + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + shard_iterator = response['NextShardIterator'] + response['Records'].should.have.length_of(1) + record = response['Records'][0] + + record["Data"].should.equal("hello world") + record["PartitionKey"].should.equal("1234") + record["SequenceNumber"].should.equal("1") + + +@mock_kinesis +def test_get_records_limit(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + data = "hello world" + for index in range(5): + conn.put_record(stream_name, data, index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Retrieve only 3 records + response = conn.get_records(shard_iterator, limit=3) + response['Records'].should.have.length_of(3) + + # Then get the rest of the results + next_shard_iterator = response['NextShardIterator'] + response = conn.get_records(next_shard_iterator) + response['Records'].should.have.length_of(2) + + +@mock_kinesis +def test_get_records_at_sequence_number(): + # AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by a specific sequence number. + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + for index in range(1, 5): + conn.put_record(stream_name, str(index), index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Get the second record + response = conn.get_records(shard_iterator, limit=2) + second_sequence_id = response['Records'][1]['SequenceNumber'] + + # Then get a new iterator starting at that id + response = conn.get_shard_iterator(stream_name, shard_id, 'AT_SEQUENCE_NUMBER', second_sequence_id) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + # And the first result returned should be the second item + response['Records'][0]['SequenceNumber'].should.equal(second_sequence_id) + response['Records'][0]['Data'].should.equal('2') + + +@mock_kinesis +def test_get_records_after_sequence_number(): + # AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted by a specific sequence number. + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + for index in range(1, 5): + conn.put_record(stream_name, str(index), index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Get the second record + response = conn.get_records(shard_iterator, limit=2) + second_sequence_id = response['Records'][1]['SequenceNumber'] + + # Then get a new iterator starting after that id + response = conn.get_shard_iterator(stream_name, shard_id, 'AFTER_SEQUENCE_NUMBER', second_sequence_id) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + # And the first result returned should be the third item + response['Records'][0]['Data'].should.equal('3') + + +@mock_kinesis +def test_get_records_latest(): + # LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard. + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + for index in range(1, 5): + conn.put_record(stream_name, str(index), index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Get the second record + response = conn.get_records(shard_iterator, limit=2) + second_sequence_id = response['Records'][1]['SequenceNumber'] + + # Then get a new iterator starting after that id + response = conn.get_shard_iterator(stream_name, shard_id, 'LATEST', second_sequence_id) + shard_iterator = response['ShardIterator'] + + # Write some more data + conn.put_record(stream_name, "last_record", "last_record") + + response = conn.get_records(shard_iterator) + # And the only result returned should be the new item + response['Records'].should.have.length_of(1) + response['Records'][0]['PartitionKey'].should.equal('last_record') + response['Records'][0]['Data'].should.equal('last_record') + + +@mock_kinesis +def test_invalid_shard_iterator_type(): + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator.when.called_with( + stream_name, shard_id, 'invalid-type').should.throw(InvalidArgumentException)