diff --git a/moto/__init__.py b/moto/__init__.py index 75bd5a53c..8041f0856 100644 --- a/moto/__init__.py +++ b/moto/__init__.py @@ -11,6 +11,7 @@ from .ec2 import mock_ec2 # flake8: noqa from .elb import mock_elb # flake8: noqa from .emr import mock_emr # flake8: noqa from .iam import mock_iam # flake8: noqa +from .kinesis import mock_kinesis # flake8: noqa from .redshift import mock_redshift # flake8: noqa from .s3 import mock_s3 # flake8: noqa from .s3bucket_path import mock_s3bucket_path # flake8: noqa diff --git a/moto/backends.py b/moto/backends.py index d9df2133d..cf6759d99 100644 --- a/moto/backends.py +++ b/moto/backends.py @@ -6,6 +6,7 @@ from moto.dynamodb2 import dynamodb_backend2 from moto.ec2 import ec2_backend from moto.elb import elb_backend from moto.emr import emr_backend +from moto.kinesis import kinesis_backend from moto.redshift import redshift_backend from moto.s3 import s3_backend from moto.s3bucket_path import s3bucket_path_backend @@ -22,6 +23,7 @@ BACKENDS = { 'ec2': ec2_backend, 'elb': elb_backend, 'emr': emr_backend, + 'kinesis': kinesis_backend, 'redshift': redshift_backend, 's3': s3_backend, 's3bucket_path': s3bucket_path_backend, diff --git a/moto/core/responses.py b/moto/core/responses.py index 80e3e77f6..c4b3f38ed 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -90,6 +90,12 @@ class BaseResponse(object): def call_action(self): headers = self.response_headers action = self.querystring.get('Action', [""])[0] + if not action: # Some services use a header for the action + # Headers are case-insensitive. Probably a better way to do this. + match = self.headers.get('x-amz-target') or self.headers.get('X-Amz-Target') + if match: + action = match.split(".")[1] + action = camelcase_to_underscores(action) method_names = method_names_from_class(self.__class__) if action in method_names: diff --git a/moto/kinesis/__init__.py b/moto/kinesis/__init__.py new file mode 100644 index 000000000..415b960e1 --- /dev/null +++ b/moto/kinesis/__init__.py @@ -0,0 +1,12 @@ +from __future__ import unicode_literals +from .models import kinesis_backends +from ..core.models import MockAWS + +kinesis_backend = kinesis_backends['us-east-1'] + + +def mock_kinesis(func=None): + if func: + return MockAWS(kinesis_backends)(func) + else: + return MockAWS(kinesis_backends) diff --git a/moto/kinesis/exceptions.py b/moto/kinesis/exceptions.py new file mode 100644 index 000000000..981a577ec --- /dev/null +++ b/moto/kinesis/exceptions.py @@ -0,0 +1,19 @@ +from __future__ import unicode_literals + +import json +from werkzeug.exceptions import BadRequest + + +class ResourceNotFoundError(BadRequest): + def __init__(self, message): + super(ResourceNotFoundError, self).__init__() + self.description = json.dumps({ + "message": message, + '__type': 'ResourceNotFoundException', + }) + + +class StreamNotFoundError(ResourceNotFoundError): + def __init__(self, stream_name): + super(StreamNotFoundError, self).__init__( + 'Stream {} under account 123456789012 not found.'.format(stream_name)) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py new file mode 100644 index 000000000..04fb0550d --- /dev/null +++ b/moto/kinesis/models.py @@ -0,0 +1,81 @@ +from __future__ import unicode_literals + +import boto.kinesis +from moto.core import BaseBackend +from .exceptions import StreamNotFoundError + + +class Stream(object): + def __init__(self, stream_name, shard_count, region): + self.stream_name = stream_name + self.shard_count = shard_count + self.region = region + self.account_number = "123456789012" + + @property + def arn(self): + return "arn:aws:kinesis:{region}:{account_number}:{stream_name}".format( + region=self.region, + account_number=self.account_number, + stream_name=self.stream_name + ) + + def to_json(self): + return { + "StreamDescription": { + "StreamARN": self.arn, + "StreamName": self.stream_name, + "StreamStatus": "ACTIVE", + "HasMoreShards": False, + "Shards": [{ + "HashKeyRange": { + "EndingHashKey": "113427455640312821154458202477256070484", + "StartingHashKey": "0" + }, + "SequenceNumberRange": { + "EndingSequenceNumber": "21269319989741826081360214168359141376", + "StartingSequenceNumber": "21267647932558653966460912964485513216" + }, + "ShardId": "shardId-000000000000" + }, { + "HashKeyRange": { + "EndingHashKey": "226854911280625642308916404954512140969", + "StartingHashKey": "113427455640312821154458202477256070485" + }, + "SequenceNumberRange": { + "StartingSequenceNumber": "21267647932558653966460912964485513217" + }, + "ShardId": "shardId-000000000001" + }], + } + } + + +class KinesisBackend(BaseBackend): + + def __init__(self): + self.streams = {} + + def create_stream(self, stream_name, shard_count, region): + stream = Stream(stream_name, shard_count, region) + self.streams[stream_name] = stream + return stream + + def describe_stream(self, stream_name): + if stream_name in self.streams: + return self.streams[stream_name] + else: + raise StreamNotFoundError(stream_name) + + def list_streams(self): + return self.streams.values() + + def delete_stream(self, stream_name): + if stream_name in self.streams: + return self.streams.pop(stream_name) + raise StreamNotFoundError(stream_name) + + +kinesis_backends = {} +for region in boto.kinesis.regions(): + kinesis_backends[region.name] = KinesisBackend() diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py new file mode 100644 index 000000000..fe57cbfd1 --- /dev/null +++ b/moto/kinesis/responses.py @@ -0,0 +1,42 @@ +from __future__ import unicode_literals + +import json + +from moto.core.responses import BaseResponse +from .models import kinesis_backends + + +class KinesisResponse(BaseResponse): + + @property + def parameters(self): + return json.loads(self.body) + + @property + def kinesis_backend(self): + return kinesis_backends[self.region] + + def create_stream(self): + stream_name = self.parameters.get('StreamName') + shard_count = self.parameters.get('ShardCount') + self.kinesis_backend.create_stream(stream_name, shard_count, self.region) + return "" + + def describe_stream(self): + stream_name = self.parameters.get('StreamName') + stream = self.kinesis_backend.describe_stream(stream_name) + return json.dumps(stream.to_json()) + + def list_streams(self): + streams = self.kinesis_backend.list_streams() + + return json.dumps({ + "HasMoreStreams": False, + "StreamNames": [stream.stream_name for stream in streams], + }) + + def delete_stream(self): + stream_name = self.parameters.get("StreamName") + self.kinesis_backend.delete_stream(stream_name) + + return "" diff --git a/moto/kinesis/urls.py b/moto/kinesis/urls.py new file mode 100644 index 000000000..e55bfcbef --- /dev/null +++ b/moto/kinesis/urls.py @@ -0,0 +1,10 @@ +from __future__ import unicode_literals +from .responses import KinesisResponse + +url_bases = [ + "https?://kinesis.(.+).amazonaws.com", +] + +url_paths = { + '{0}/$': KinesisResponse().dispatch, +} diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py new file mode 100644 index 000000000..8272ec75e --- /dev/null +++ b/tests/test_kinesis/test_kinesis.py @@ -0,0 +1,48 @@ +from __future__ import unicode_literals + +import boto.kinesis +from boto.kinesis.exceptions import ResourceNotFoundException +import sure # noqa + +from moto import mock_kinesis + + +@mock_kinesis +def test_create_cluster(): + conn = boto.kinesis.connect_to_region("us-west-2") + + conn.create_stream("my_stream", 2) + + stream_response = conn.describe_stream("my_stream") + + stream = stream_response["StreamDescription"] + stream["StreamName"].should.equal("my_stream") + stream["HasMoreShards"].should.equal(False) + stream["StreamARN"].should.equal("arn:aws:kinesis:us-west-2:123456789012:my_stream") + stream["StreamStatus"].should.equal("ACTIVE") + + shards = stream['Shards'] + shards.should.have.length_of(2) + + +@mock_kinesis +def test_describe_non_existant_stream(): + conn = boto.kinesis.connect_to_region("us-east-1") + conn.describe_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException) + + +@mock_kinesis +def test_list_and_delete_stream(): + conn = boto.kinesis.connect_to_region("us-west-2") + + conn.create_stream("stream1", 1) + conn.create_stream("stream2", 1) + + conn.list_streams()['StreamNames'].should.have.length_of(2) + + conn.delete_stream("stream2") + + conn.list_streams()['StreamNames'].should.have.length_of(1) + + # Delete invalid id + conn.delete_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException) diff --git a/tests/test_kinesis/test_server.py b/tests/test_kinesis/test_server.py new file mode 100644 index 000000000..527310d75 --- /dev/null +++ b/tests/test_kinesis/test_server.py @@ -0,0 +1,25 @@ +from __future__ import unicode_literals + +import json +import sure # noqa + +import moto.server as server +from moto import mock_kinesis + +''' +Test the different server responses +''' + + +@mock_kinesis +def test_list_streams(): + backend = server.create_backend_app("kinesis") + test_client = backend.test_client() + + res = test_client.get('/?Action=ListStreams') + + json_data = json.loads(res.data.decode("utf-8")) + json_data.should.equal({ + "HasMoreStreams": False, + "StreamNames": [], + })