From da15fb711d638fe928cad0b5f47bcc188c3e6fc9 Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 26 Nov 2014 10:55:58 -0500 Subject: [PATCH 1/5] Basic Kinesis Stream CRUD. --- moto/__init__.py | 1 + moto/backends.py | 2 + moto/core/responses.py | 6 +++ moto/kinesis/__init__.py | 12 +++++ moto/kinesis/exceptions.py | 19 +++++++ moto/kinesis/models.py | 81 ++++++++++++++++++++++++++++++ moto/kinesis/responses.py | 42 ++++++++++++++++ moto/kinesis/urls.py | 10 ++++ tests/test_kinesis/test_kinesis.py | 48 ++++++++++++++++++ tests/test_kinesis/test_server.py | 25 +++++++++ 10 files changed, 246 insertions(+) create mode 100644 moto/kinesis/__init__.py create mode 100644 moto/kinesis/exceptions.py create mode 100644 moto/kinesis/models.py create mode 100644 moto/kinesis/responses.py create mode 100644 moto/kinesis/urls.py create mode 100644 tests/test_kinesis/test_kinesis.py create mode 100644 tests/test_kinesis/test_server.py diff --git a/moto/__init__.py b/moto/__init__.py index 75bd5a53c..8041f0856 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -11,6 +11,7 @@ from .ec2 import mock_ec2 # flake8: noqa from .elb import mock_elb # flake8: noqa from .emr import mock_emr # flake8: noqa from .iam import mock_iam # flake8: noqa +from .kinesis import mock_kinesis # flake8: noqa from .redshift import mock_redshift # flake8: noqa from .s3 import mock_s3 # flake8: noqa from .s3bucket_path import mock_s3bucket_path # flake8: noqa diff --git a/moto/backends.py b/moto/backends.py index d9df2133d..cf6759d99 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -6,6 +6,7 @@ from moto.dynamodb2 import dynamodb_backend2 from moto.ec2 import ec2_backend from moto.elb import elb_backend from moto.emr import emr_backend +from moto.kinesis import kinesis_backend from moto.redshift import redshift_backend from moto.s3 import s3_backend from moto.s3bucket_path import s3bucket_path_backend @@ -22,6 +23,7 @@ BACKENDS = { 'ec2': ec2_backend, 'elb': elb_backend, 'emr': emr_backend, + 'kinesis': kinesis_backend, 'redshift': redshift_backend, 's3': s3_backend, 's3bucket_path': s3bucket_path_backend, diff --git a/moto/core/responses.py b/moto/core/responses.py index 80e3e77f6..c4b3f38ed 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -90,6 +90,12 @@ class BaseResponse(object): def call_action(self): headers = self.response_headers action = self.querystring.get('Action', [""])[0] + if not action: # Some services use a header for the action + # Headers are case-insensitive. Probably a better way to do this. + match = self.headers.get('x-amz-target') or self.headers.get('X-Amz-Target') + if match: + action = match.split(".")[1] + action = camelcase_to_underscores(action) method_names = method_names_from_class(self.__class__) if action in method_names: diff --git a/moto/kinesis/__init__.py b/moto/kinesis/__init__.py new file mode 100644 index 000000000..415b960e1 --- /dev/null +++ b/moto/kinesis/__init__.py @@ -0,0 +1,12 @@ +from __future__ import unicode_literals +from .models import kinesis_backends +from ..core.models import MockAWS + +kinesis_backend = kinesis_backends['us-east-1'] + + +def mock_kinesis(func=None): + if func: + return MockAWS(kinesis_backends)(func) + else: + return MockAWS(kinesis_backends) diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py new file mode 100644 index 000000000..981a577ec --- /dev/null +++ b/moto/kinesis/exceptions.py @@ -0,0 +1,19 @@ +from __future__ import unicode_literals + +import json +from werkzeug.exceptions import BadRequest + + +class ResourceNotFoundError(BadRequest): + def __init__(self, message): + super(ResourceNotFoundError, self).__init__() + self.description = json.dumps({ + "message": message, + '__type': 'ResourceNotFoundException', + }) + + +class StreamNotFoundError(ResourceNotFoundError): + def __init__(self, stream_name): + super(StreamNotFoundError, self).__init__( + 'Stream {} under account 123456789012 not found.'.format(stream_name)) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py new file mode 100644 index 000000000..04fb0550d --- /dev/null +++ b/moto/kinesis/models.py @@ -0,0 +1,81 @@ +from __future__ import unicode_literals + +import boto.kinesis +from moto.core import BaseBackend +from .exceptions import StreamNotFoundError + + +class Stream(object): + def __init__(self, stream_name, shard_count, region): + self.stream_name = stream_name + self.shard_count = shard_count + self.region = region + self.account_number = "123456789012" + + @property + def arn(self): + return "arn:aws:kinesis:{region}:{account_number}:{stream_name}".format( + region=self.region, + account_number=self.account_number, + stream_name=self.stream_name + ) + + def to_json(self): + return { + "StreamDescription": { + "StreamARN": self.arn, + "StreamName": self.stream_name, + "StreamStatus": "ACTIVE", + "HasMoreShards": False, + "Shards": [{ + "HashKeyRange": { + "EndingHashKey": "113427455640312821154458202477256070484", + "StartingHashKey": "0" + }, + "SequenceNumberRange": { + "EndingSequenceNumber": "21269319989741826081360214168359141376", + "StartingSequenceNumber": "21267647932558653966460912964485513216" + }, + "ShardId": "shardId-000000000000" + }, { + "HashKeyRange": { + "EndingHashKey": "226854911280625642308916404954512140969", + "StartingHashKey": "113427455640312821154458202477256070485" + }, + "SequenceNumberRange": { + "StartingSequenceNumber": "21267647932558653966460912964485513217" + }, + "ShardId": "shardId-000000000001" + }], + } + } + + +class KinesisBackend(BaseBackend): + + def __init__(self): + self.streams = {} + + def create_stream(self, stream_name, shard_count, region): + stream = Stream(stream_name, shard_count, region) + self.streams[stream_name] = stream + return stream + + def describe_stream(self, stream_name): + if stream_name in self.streams: + return self.streams[stream_name] + else: + raise StreamNotFoundError(stream_name) + + def list_streams(self): + return self.streams.values() + + def delete_stream(self, stream_name): + if stream_name in self.streams: + return self.streams.pop(stream_name) + raise StreamNotFoundError(stream_name) + + +kinesis_backends = {} +for region in boto.kinesis.regions(): + kinesis_backends[region.name] = KinesisBackend() diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py new file mode 100644 index 000000000..fe57cbfd1 --- /dev/null +++ b/moto/kinesis/responses.py @@ -0,0 +1,42 @@ +from __future__ import unicode_literals + +import json + +from moto.core.responses import BaseResponse +from .models import kinesis_backends + + +class KinesisResponse(BaseResponse): + + @property + def parameters(self): + return json.loads(self.body) + + @property + def kinesis_backend(self): + return kinesis_backends[self.region] + + def create_stream(self): + stream_name = self.parameters.get('StreamName') + shard_count = self.parameters.get('ShardCount') + self.kinesis_backend.create_stream(stream_name, shard_count, self.region) + return "" + + def describe_stream(self): + stream_name = self.parameters.get('StreamName') + stream = self.kinesis_backend.describe_stream(stream_name) + return json.dumps(stream.to_json()) + + def list_streams(self): + streams = self.kinesis_backend.list_streams() + + return json.dumps({ + "HasMoreStreams": False, + "StreamNames": [stream.stream_name for stream in streams], + }) + + def delete_stream(self): + stream_name = self.parameters.get("StreamName") + self.kinesis_backend.delete_stream(stream_name) + + return "" diff --git a/moto/kinesis/urls.py b/moto/kinesis/urls.py new file mode 100644 index 000000000..e55bfcbef --- /dev/null +++ b/moto/kinesis/urls.py @@ -0,0 +1,10 @@ +from __future__ import unicode_literals +from .responses import KinesisResponse + +url_bases = [ + "https?://kinesis.(.+).amazonaws.com", +] + +url_paths = { + '{0}/$': KinesisResponse().dispatch, +} diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py new file mode 100644 index 000000000..8272ec75e --- /dev/null +++ b/tests/test_kinesis/test_kinesis.py @@ -0,0 +1,48 @@ +from __future__ import unicode_literals + +import boto.kinesis +from boto.kinesis.exceptions import ResourceNotFoundException +import sure # noqa + +from moto import mock_kinesis + + +@mock_kinesis +def test_create_cluster(): + conn = boto.kinesis.connect_to_region("us-west-2") + + conn.create_stream("my_stream", 2) + + stream_response = conn.describe_stream("my_stream") + + stream = stream_response["StreamDescription"] + stream["StreamName"].should.equal("my_stream") + stream["HasMoreShards"].should.equal(False) + stream["StreamARN"].should.equal("arn:aws:kinesis:us-west-2:123456789012:my_stream") + stream["StreamStatus"].should.equal("ACTIVE") + + shards = stream['Shards'] + shards.should.have.length_of(2) + + +@mock_kinesis +def test_describe_non_existant_stream(): + conn = boto.kinesis.connect_to_region("us-east-1") + conn.describe_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException) + + +@mock_kinesis +def test_list_and_delete_stream(): + conn = boto.kinesis.connect_to_region("us-west-2") + + conn.create_stream("stream1", 1) + conn.create_stream("stream2", 1) + + conn.list_streams()['StreamNames'].should.have.length_of(2) + + conn.delete_stream("stream2") + + conn.list_streams()['StreamNames'].should.have.length_of(1) + + # Delete invalid id + conn.delete_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException) diff --git a/tests/test_kinesis/test_server.py b/tests/test_kinesis/test_server.py new file mode 100644 index 000000000..527310d75 --- /dev/null +++ b/tests/test_kinesis/test_server.py @@ -0,0 +1,25 @@ +from __future__ import unicode_literals + +import json +import sure # noqa + +import moto.server as server +from moto import mock_kinesis + +''' +Test the different server responses +''' + + +@mock_kinesis +def test_list_streams(): + backend = server.create_backend_app("kinesis") + test_client = backend.test_client() + + res = test_client.get('/?Action=ListStreams') + + json_data = json.loads(res.data.decode("utf-8")) + json_data.should.equal({ + "HasMoreStreams": False, + "StreamNames": [], + }) From 38a4734f95d6d6455d11725cd06b291cfe963348 Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 26 Nov 2014 11:13:43 -0500 Subject: [PATCH 2/5] Fixes for py26 and py33. --- moto/kinesis/exceptions.py | 2 +- moto/kinesis/responses.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py index 981a577ec..4ba4c8bf8 100644 --- a/moto/kinesis/exceptions.py +++ b/moto/kinesis/exceptions.py @@ -16,4 +16,4 @@ class ResourceNotFoundError(BadRequest): class StreamNotFoundError(ResourceNotFoundError): def __init__(self, stream_name): super(StreamNotFoundError, self).__init__( - 'Stream {} under account 123456789012 not found.'.format(stream_name)) + 'Stream {0} under account 123456789012 not found.'.format(stream_name)) diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index fe57cbfd1..43ed8b06a 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -10,7 +10,7 @@ class KinesisResponse(BaseResponse): @property def parameters(self): - return json.loads(self.body) + return json.loads(self.body.decode("utf-8")) @property def kinesis_backend(self): From 57d45aa4b8f1e4cc589e9a8954342881cd26f47d Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 26 Nov 2014 20:49:21 -0500 Subject: [PATCH 3/5] Add more shard iterator types. --- moto/kinesis/exceptions.py | 15 +++ moto/kinesis/models.py | 144 +++++++++++++++++---- moto/kinesis/responses.py | 41 ++++++ moto/kinesis/utils.py | 31 +++++ tests/test_kinesis/test_kinesis.py | 193 ++++++++++++++++++++++++++++- 5 files changed, 402 insertions(+), 22 deletions(-) create mode 100644 moto/kinesis/utils.py diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py index 4ba4c8bf8..c6be76885 100644 --- a/moto/kinesis/exceptions.py +++ b/moto/kinesis/exceptions.py @@ -17,3 +17,18 @@ class StreamNotFoundError(ResourceNotFoundError): def __init__(self, stream_name): super(StreamNotFoundError, self).__init__( 'Stream {0} under account 123456789012 not found.'.format(stream_name)) + + +class ShardNotFoundError(ResourceNotFoundError): + def __init__(self, shard_id): + super(ShardNotFoundError, self).__init__( + 'Shard {0} under account 123456789012 not found.'.format(shard_id)) + + +class InvalidArgumentError(BadRequest): + def __init__(self, message): + super(InvalidArgumentError, self).__init__() + self.description = json.dumps({ + "message": message, + '__type': 'InvalidArgumentException', + }) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 04fb0550d..0a3eef3cd 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -2,7 +2,74 @@ from __future__ import unicode_literals import boto.kinesis from moto.core import BaseBackend -from .exceptions import StreamNotFoundError +from .exceptions import StreamNotFoundError, ShardNotFoundError +from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose_shard_iterator + +try: + from collections import OrderedDict +except ImportError: + # python 2.6 or earlier, use backport + from ordereddict import OrderedDict + + +class Record(object): + def __init__(self, partition_key, data, sequence_number): + self.partition_key = partition_key + self.data = data + self.sequence_number = sequence_number + + def to_json(self): + return { + "Data": self.data, + "PartitionKey": self.partition_key, + "SequenceNumber": str(self.sequence_number), + } + + +class Shard(object): + def __init__(self, shard_id): + self.shard_id = shard_id + self.records = OrderedDict() + + def get_records(self, last_sequence_id, limit): + last_sequence_id = int(last_sequence_id) + results = [] + + for sequence_number, record in self.records.items(): + if sequence_number > last_sequence_id: + results.append(record) + last_sequence_id = sequence_number + + if len(results) == limit: + break + + return results, last_sequence_id + + def put_record(self, partition_key, data): + # Note: this function is not safe for concurrency + if self.records: + last_sequence_number = self.get_max_sequence_number() + else: + last_sequence_number = 0 + sequence_number = last_sequence_number + 1 + self.records[sequence_number] = Record(partition_key, data, sequence_number) + return sequence_number + + def get_max_sequence_number(self): + return self.records.keys()[-1] + + def to_json(self): + return { + "HashKeyRange": { + "EndingHashKey": "113427455640312821154458202477256070484", + "StartingHashKey": "0" + }, + "SequenceNumberRange": { + "EndingSequenceNumber": "21269319989741826081360214168359141376", + "StartingSequenceNumber": "21267647932558653966460912964485513216" + }, + "ShardId": self.shard_id + } class Stream(object): @@ -11,6 +78,11 @@ class Stream(object): self.shard_count = shard_count self.region = region self.account_number = "123456789012" + self.shards = {} + + for index in range(shard_count): + shard_id = "shardId-{}".format(str(index).zfill(12)) + self.shards[shard_id] = Shard(shard_id) @property def arn(self): @@ -20,6 +92,24 @@ class Stream(object): stream_name=self.stream_name ) + def get_shard(self, shard_id): + if shard_id in self.shards: + return self.shards[shard_id] + else: + raise ShardNotFoundError(shard_id) + + def get_shard_for_key(self, partition_key): + # TODO implement sharding + shard = self.shards.values()[0] + return shard + + def put_record(self, partition_key, explicit_hash_key, sequence_number_for_ordering, data): + partition_key = explicit_hash_key if explicit_hash_key else partition_key + shard = self.get_shard_for_key(partition_key) + + sequence_number = shard.put_record(partition_key, data) + return sequence_number, shard.shard_id + def to_json(self): return { "StreamDescription": { @@ -27,26 +117,7 @@ class Stream(object): "StreamName": self.stream_name, "StreamStatus": "ACTIVE", "HasMoreShards": False, - "Shards": [{ - "HashKeyRange": { - "EndingHashKey": "113427455640312821154458202477256070484", - "StartingHashKey": "0" - }, - "SequenceNumberRange": { - "EndingSequenceNumber": "21269319989741826081360214168359141376", - "StartingSequenceNumber": "21267647932558653966460912964485513216" - }, - "ShardId": "shardId-000000000000" - }, { - "HashKeyRange": { - "EndingHashKey": "226854911280625642308916404954512140969", - "StartingHashKey": "113427455640312821154458202477256070485" - }, - "SequenceNumberRange": { - "StartingSequenceNumber": "21267647932558653966460912964485513217" - }, - "ShardId": "shardId-000000000001" - }], + "Shards": [shard.to_json() for shard in self.shards.values()], } } @@ -75,6 +146,37 @@ class KinesisBackend(BaseBackend): return self.streams.pop(stream_name) raise StreamNotFoundError(stream_name) + def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number): + # Validate params + stream = self.describe_stream(stream_name) + shard = stream.get_shard(shard_id) + + shard_iterator = compose_new_shard_iterator( + stream_name, shard, shard_iterator_type, starting_sequence_number + ) + return shard_iterator + + def get_records(self, shard_iterator, limit): + decomposed = decompose_shard_iterator(shard_iterator) + stream_name, shard_id, last_sequence_id = decomposed + + stream = self.describe_stream(stream_name) + shard = stream.get_shard(shard_id) + + records, last_sequence_id = shard.get_records(last_sequence_id, limit) + + next_shard_iterator = compose_shard_iterator(stream_name, shard, last_sequence_id) + + return next_shard_iterator, records + + def put_record(self, stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data): + stream = self.describe_stream(stream_name) + + sequence_number, shard_id = stream.put_record( + partition_key, explicit_hash_key, sequence_number_for_ordering, data + ) + + return sequence_number, shard_id kinesis_backends = {} for region in boto.kinesis.regions(): diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index 43ed8b06a..4b5f13729 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -40,3 +40,44 @@ class KinesisResponse(BaseResponse): self.kinesis_backend.delete_stream(stream_name) return "" + + def get_shard_iterator(self): + stream_name = self.parameters.get("StreamName") + shard_id = self.parameters.get("ShardId") + shard_iterator_type = self.parameters.get("ShardIteratorType") + starting_sequence_number = self.parameters.get("StartingSequenceNumber") + + shard_iterator = self.kinesis_backend.get_shard_iterator( + stream_name, shard_id, shard_iterator_type, starting_sequence_number, + ) + + return json.dumps({ + "ShardIterator": shard_iterator + }) + + def get_records(self): + shard_iterator = self.parameters.get("ShardIterator") + limit = self.parameters.get("Limit") + + next_shard_iterator, records = self.kinesis_backend.get_records(shard_iterator, limit) + + return json.dumps({ + "NextShardIterator": next_shard_iterator, + "Records": [record.to_json() for record in records] + }) + + def put_record(self): + stream_name = self.parameters.get("StreamName") + partition_key = self.parameters.get("PartitionKey") + explicit_hash_key = self.parameters.get("ExplicitHashKey") + sequence_number_for_ordering = self.parameters.get("SequenceNumberForOrdering") + data = self.parameters.get("Data") + + sequence_number, shard_id = self.kinesis_backend.put_record( + stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data + ) + + return json.dumps({ + "SequenceNumber": sequence_number, + "ShardId": shard_id, + }) diff --git a/moto/kinesis/utils.py b/moto/kinesis/utils.py new file mode 100644 index 000000000..c998fe295 --- /dev/null +++ b/moto/kinesis/utils.py @@ -0,0 +1,31 @@ +import base64 + +from .exceptions import InvalidArgumentError + + +def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number): + if shard_iterator_type == "AT_SEQUENCE_NUMBER": + last_sequence_id = int(starting_sequence_number) - 1 + elif shard_iterator_type == "AFTER_SEQUENCE_NUMBER": + last_sequence_id = int(starting_sequence_number) + elif shard_iterator_type == "TRIM_HORIZON": + last_sequence_id = 0 + elif shard_iterator_type == "LATEST": + last_sequence_id = shard.get_max_sequence_number() + else: + raise InvalidArgumentError("Invalid ShardIteratorType: {0}".format(shard_iterator_type)) + return compose_shard_iterator(stream_name, shard, last_sequence_id) + + +def compose_shard_iterator(stream_name, shard, last_sequence_id): + return base64.encodestring( + "{0}:{1}:{2}".format( + stream_name, + shard.shard_id, + last_sequence_id, + ) + ) + + +def decompose_shard_iterator(shard_iterator): + return base64.decodestring(shard_iterator).split(":") diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index 8272ec75e..b85dc1dbd 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -1,7 +1,7 @@ from __future__ import unicode_literals import boto.kinesis -from boto.kinesis.exceptions import ResourceNotFoundException +from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException import sure # noqa from moto import mock_kinesis @@ -46,3 +46,194 @@ def test_list_and_delete_stream(): # Delete invalid id conn.delete_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException) + + +@mock_kinesis +def test_basic_shard_iterator(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + shard_iterator = response['NextShardIterator'] + response['Records'].should.equal([]) + + +@mock_kinesis +def test_get_invalid_shard_iterator(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + conn.get_shard_iterator.when.called_with(stream_name, "123", 'TRIM_HORIZON').should.throw(ResourceNotFoundException) + + +@mock_kinesis +def test_put_records(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + data = "hello world" + partition_key = "1234" + conn.put_record(stream_name, data, partition_key) + + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + shard_iterator = response['NextShardIterator'] + response['Records'].should.have.length_of(1) + record = response['Records'][0] + + record["Data"].should.equal("hello world") + record["PartitionKey"].should.equal("1234") + record["SequenceNumber"].should.equal("1") + + +@mock_kinesis +def test_get_records_limit(): + conn = boto.kinesis.connect_to_region("us-west-2") + + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + data = "hello world" + for index in range(5): + conn.put_record(stream_name, data, index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Retrieve only 3 records + response = conn.get_records(shard_iterator, limit=3) + response['Records'].should.have.length_of(3) + + # Then get the rest of the results + next_shard_iterator = response['NextShardIterator'] + response = conn.get_records(next_shard_iterator) + response['Records'].should.have.length_of(2) + + +@mock_kinesis +def test_get_records_at_sequence_number(): + # AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by a specific sequence number. + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + for index in range(1, 5): + conn.put_record(stream_name, str(index), index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Get the second record + response = conn.get_records(shard_iterator, limit=2) + second_sequence_id = response['Records'][1]['SequenceNumber'] + + # Then get a new iterator starting at that id + response = conn.get_shard_iterator(stream_name, shard_id, 'AT_SEQUENCE_NUMBER', second_sequence_id) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + # And the first result returned should be the second item + response['Records'][0]['SequenceNumber'].should.equal(second_sequence_id) + response['Records'][0]['Data'].should.equal('2') + + +@mock_kinesis +def test_get_records_after_sequence_number(): + # AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted by a specific sequence number. + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + for index in range(1, 5): + conn.put_record(stream_name, str(index), index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Get the second record + response = conn.get_records(shard_iterator, limit=2) + second_sequence_id = response['Records'][1]['SequenceNumber'] + + # Then get a new iterator starting after that id + response = conn.get_shard_iterator(stream_name, shard_id, 'AFTER_SEQUENCE_NUMBER', second_sequence_id) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(shard_iterator) + # And the first result returned should be the third item + response['Records'][0]['Data'].should.equal('3') + + +@mock_kinesis +def test_get_records_latest(): + # LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard. + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + # Create some data + for index in range(1, 5): + conn.put_record(stream_name, str(index), index) + + # Get a shard iterator + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + # Get the second record + response = conn.get_records(shard_iterator, limit=2) + second_sequence_id = response['Records'][1]['SequenceNumber'] + + # Then get a new iterator starting after that id + response = conn.get_shard_iterator(stream_name, shard_id, 'LATEST', second_sequence_id) + shard_iterator = response['ShardIterator'] + + # Write some more data + conn.put_record(stream_name, "last_record", "last_record") + + response = conn.get_records(shard_iterator) + # And the only result returned should be the new item + response['Records'].should.have.length_of(1) + response['Records'][0]['PartitionKey'].should.equal('last_record') + response['Records'][0]['Data'].should.equal('last_record') + + +@mock_kinesis +def test_invalid_shard_iterator_type(): + conn = boto.kinesis.connect_to_region("us-west-2") + stream_name = "my_stream" + conn.create_stream(stream_name, 1) + + response = conn.describe_stream(stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator.when.called_with( + stream_name, shard_id, 'invalid-type').should.throw(InvalidArgumentException) From c63b00b8ba572d90e8abfb2f40213641c0088e8a Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 26 Nov 2014 21:04:44 -0500 Subject: [PATCH 4/5] Fixes for other python versions. --- moto/kinesis/models.py | 6 +++--- moto/kinesis/utils.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 0a3eef3cd..6a1b081f2 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -56,7 +56,7 @@ class Shard(object): return sequence_number def get_max_sequence_number(self): - return self.records.keys()[-1] + return list(self.records.keys())[-1] def to_json(self): return { @@ -81,7 +81,7 @@ class Stream(object): self.shards = {} for index in range(shard_count): - shard_id = "shardId-{}".format(str(index).zfill(12)) + shard_id = "shardId-{0}".format(str(index).zfill(12)) self.shards[shard_id] = Shard(shard_id) @property @@ -100,7 +100,7 @@ class Stream(object): def get_shard_for_key(self, partition_key): # TODO implement sharding - shard = self.shards.values()[0] + shard = list(self.shards.values())[0] return shard def put_record(self, partition_key, explicit_hash_key, sequence_number_for_ordering, data): diff --git a/moto/kinesis/utils.py b/moto/kinesis/utils.py index c998fe295..0d35b4134 100644 --- a/moto/kinesis/utils.py +++ b/moto/kinesis/utils.py @@ -23,9 +23,9 @@ def compose_shard_iterator(stream_name, shard, last_sequence_id): stream_name, shard.shard_id, last_sequence_id, - ) - ) + ).encode("utf-8") + ).decode("utf-8") def decompose_shard_iterator(shard_iterator): - return base64.decodestring(shard_iterator).split(":") + return base64.decodestring(shard_iterator.encode("utf-8")).decode("utf-8").split(":") From 45de4a46eceb411bc3fff082fed47441e62d51f0 Mon Sep 17 00:00:00 2001 From: Steve Pulec Date: Wed, 26 Nov 2014 21:55:01 -0500 Subject: [PATCH 5/5] Better sequence start and end numbers. --- moto/kinesis/models.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index 6a1b081f2..52a5e91f7 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -55,8 +55,15 @@ class Shard(object): self.records[sequence_number] = Record(partition_key, data, sequence_number) return sequence_number + def get_min_sequence_number(self): + if self.records: + return list(self.records.keys())[0] + return 0 + def get_max_sequence_number(self): - return list(self.records.keys())[-1] + if self.records: + return list(self.records.keys())[-1] + return 0 def to_json(self): return { @@ -65,8 +72,8 @@ class Shard(object): "StartingHashKey": "0" }, "SequenceNumberRange": { - "EndingSequenceNumber": "21269319989741826081360214168359141376", - "StartingSequenceNumber": "21267647932558653966460912964485513216" + "EndingSequenceNumber": self.get_max_sequence_number(), + "StartingSequenceNumber": self.get_min_sequence_number(), }, "ShardId": self.shard_id }