Merge pull request #266 from spulec/kinesis

Kinesis
This commit is contained in:
Steve Pulec 2014-11-26 22:05:43 -05:00
commit c01f7fe95b
11 changed files with 633 additions and 0 deletions

View File

@ -11,6 +11,7 @@ from .ec2 import mock_ec2 # flake8: noqa
from .elb import mock_elb # flake8: noqa
from .emr import mock_emr # flake8: noqa
from .iam import mock_iam # flake8: noqa
from .kinesis import mock_kinesis # flake8: noqa
from .redshift import mock_redshift # flake8: noqa
from .s3 import mock_s3 # flake8: noqa
from .s3bucket_path import mock_s3bucket_path # flake8: noqa

View File

@ -6,6 +6,7 @@ from moto.dynamodb2 import dynamodb_backend2
from moto.ec2 import ec2_backend
from moto.elb import elb_backend
from moto.emr import emr_backend
from moto.kinesis import kinesis_backend
from moto.redshift import redshift_backend
from moto.s3 import s3_backend
from moto.s3bucket_path import s3bucket_path_backend
@ -22,6 +23,7 @@ BACKENDS = {
'ec2': ec2_backend,
'elb': elb_backend,
'emr': emr_backend,
'kinesis': kinesis_backend,
'redshift': redshift_backend,
's3': s3_backend,
's3bucket_path': s3bucket_path_backend,

View File

@ -90,6 +90,12 @@ class BaseResponse(object):
def call_action(self):
headers = self.response_headers
action = self.querystring.get('Action', [""])[0]
if not action: # Some services use a header for the action
# Headers are case-insensitive. Probably a better way to do this.
match = self.headers.get('x-amz-target') or self.headers.get('X-Amz-Target')
if match:
action = match.split(".")[1]
action = camelcase_to_underscores(action)
method_names = method_names_from_class(self.__class__)
if action in method_names:

12
moto/kinesis/__init__.py Normal file
View File

@ -0,0 +1,12 @@
from __future__ import unicode_literals
from .models import kinesis_backends
from ..core.models import MockAWS
kinesis_backend = kinesis_backends['us-east-1']
def mock_kinesis(func=None):
if func:
return MockAWS(kinesis_backends)(func)
else:
return MockAWS(kinesis_backends)

View File

@ -0,0 +1,34 @@
from __future__ import unicode_literals
import json
from werkzeug.exceptions import BadRequest
class ResourceNotFoundError(BadRequest):
def __init__(self, message):
super(ResourceNotFoundError, self).__init__()
self.description = json.dumps({
"message": message,
'__type': 'ResourceNotFoundException',
})
class StreamNotFoundError(ResourceNotFoundError):
def __init__(self, stream_name):
super(StreamNotFoundError, self).__init__(
'Stream {0} under account 123456789012 not found.'.format(stream_name))
class ShardNotFoundError(ResourceNotFoundError):
def __init__(self, shard_id):
super(ShardNotFoundError, self).__init__(
'Shard {0} under account 123456789012 not found.'.format(shard_id))
class InvalidArgumentError(BadRequest):
def __init__(self, message):
super(InvalidArgumentError, self).__init__()
self.description = json.dumps({
"message": message,
'__type': 'InvalidArgumentException',
})

190
moto/kinesis/models.py Normal file
View File

@ -0,0 +1,190 @@
from __future__ import unicode_literals
import boto.kinesis
from moto.core import BaseBackend
from .exceptions import StreamNotFoundError, ShardNotFoundError
from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose_shard_iterator
try:
from collections import OrderedDict
except ImportError:
# python 2.6 or earlier, use backport
from ordereddict import OrderedDict
class Record(object):
def __init__(self, partition_key, data, sequence_number):
self.partition_key = partition_key
self.data = data
self.sequence_number = sequence_number
def to_json(self):
return {
"Data": self.data,
"PartitionKey": self.partition_key,
"SequenceNumber": str(self.sequence_number),
}
class Shard(object):
def __init__(self, shard_id):
self.shard_id = shard_id
self.records = OrderedDict()
def get_records(self, last_sequence_id, limit):
last_sequence_id = int(last_sequence_id)
results = []
for sequence_number, record in self.records.items():
if sequence_number > last_sequence_id:
results.append(record)
last_sequence_id = sequence_number
if len(results) == limit:
break
return results, last_sequence_id
def put_record(self, partition_key, data):
# Note: this function is not safe for concurrency
if self.records:
last_sequence_number = self.get_max_sequence_number()
else:
last_sequence_number = 0
sequence_number = last_sequence_number + 1
self.records[sequence_number] = Record(partition_key, data, sequence_number)
return sequence_number
def get_min_sequence_number(self):
if self.records:
return list(self.records.keys())[0]
return 0
def get_max_sequence_number(self):
if self.records:
return list(self.records.keys())[-1]
return 0
def to_json(self):
return {
"HashKeyRange": {
"EndingHashKey": "113427455640312821154458202477256070484",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"EndingSequenceNumber": self.get_max_sequence_number(),
"StartingSequenceNumber": self.get_min_sequence_number(),
},
"ShardId": self.shard_id
}
class Stream(object):
def __init__(self, stream_name, shard_count, region):
self.stream_name = stream_name
self.shard_count = shard_count
self.region = region
self.account_number = "123456789012"
self.shards = {}
for index in range(shard_count):
shard_id = "shardId-{0}".format(str(index).zfill(12))
self.shards[shard_id] = Shard(shard_id)
@property
def arn(self):
return "arn:aws:kinesis:{region}:{account_number}:{stream_name}".format(
region=self.region,
account_number=self.account_number,
stream_name=self.stream_name
)
def get_shard(self, shard_id):
if shard_id in self.shards:
return self.shards[shard_id]
else:
raise ShardNotFoundError(shard_id)
def get_shard_for_key(self, partition_key):
# TODO implement sharding
shard = list(self.shards.values())[0]
return shard
def put_record(self, partition_key, explicit_hash_key, sequence_number_for_ordering, data):
partition_key = explicit_hash_key if explicit_hash_key else partition_key
shard = self.get_shard_for_key(partition_key)
sequence_number = shard.put_record(partition_key, data)
return sequence_number, shard.shard_id
def to_json(self):
return {
"StreamDescription": {
"StreamARN": self.arn,
"StreamName": self.stream_name,
"StreamStatus": "ACTIVE",
"HasMoreShards": False,
"Shards": [shard.to_json() for shard in self.shards.values()],
}
}
class KinesisBackend(BaseBackend):
def __init__(self):
self.streams = {}
def create_stream(self, stream_name, shard_count, region):
stream = Stream(stream_name, shard_count, region)
self.streams[stream_name] = stream
return stream
def describe_stream(self, stream_name):
if stream_name in self.streams:
return self.streams[stream_name]
else:
raise StreamNotFoundError(stream_name)
def list_streams(self):
return self.streams.values()
def delete_stream(self, stream_name):
if stream_name in self.streams:
return self.streams.pop(stream_name)
raise StreamNotFoundError(stream_name)
def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number):
# Validate params
stream = self.describe_stream(stream_name)
shard = stream.get_shard(shard_id)
shard_iterator = compose_new_shard_iterator(
stream_name, shard, shard_iterator_type, starting_sequence_number
)
return shard_iterator
def get_records(self, shard_iterator, limit):
decomposed = decompose_shard_iterator(shard_iterator)
stream_name, shard_id, last_sequence_id = decomposed
stream = self.describe_stream(stream_name)
shard = stream.get_shard(shard_id)
records, last_sequence_id = shard.get_records(last_sequence_id, limit)
next_shard_iterator = compose_shard_iterator(stream_name, shard, last_sequence_id)
return next_shard_iterator, records
def put_record(self, stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data):
stream = self.describe_stream(stream_name)
sequence_number, shard_id = stream.put_record(
partition_key, explicit_hash_key, sequence_number_for_ordering, data
)
return sequence_number, shard_id
kinesis_backends = {}
for region in boto.kinesis.regions():
kinesis_backends[region.name] = KinesisBackend()

83
moto/kinesis/responses.py Normal file
View File

@ -0,0 +1,83 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import kinesis_backends
class KinesisResponse(BaseResponse):
@property
def parameters(self):
return json.loads(self.body.decode("utf-8"))
@property
def kinesis_backend(self):
return kinesis_backends[self.region]
def create_stream(self):
stream_name = self.parameters.get('StreamName')
shard_count = self.parameters.get('ShardCount')
self.kinesis_backend.create_stream(stream_name, shard_count, self.region)
return ""
def describe_stream(self):
stream_name = self.parameters.get('StreamName')
stream = self.kinesis_backend.describe_stream(stream_name)
return json.dumps(stream.to_json())
def list_streams(self):
streams = self.kinesis_backend.list_streams()
return json.dumps({
"HasMoreStreams": False,
"StreamNames": [stream.stream_name for stream in streams],
})
def delete_stream(self):
stream_name = self.parameters.get("StreamName")
self.kinesis_backend.delete_stream(stream_name)
return ""
def get_shard_iterator(self):
stream_name = self.parameters.get("StreamName")
shard_id = self.parameters.get("ShardId")
shard_iterator_type = self.parameters.get("ShardIteratorType")
starting_sequence_number = self.parameters.get("StartingSequenceNumber")
shard_iterator = self.kinesis_backend.get_shard_iterator(
stream_name, shard_id, shard_iterator_type, starting_sequence_number,
)
return json.dumps({
"ShardIterator": shard_iterator
})
def get_records(self):
shard_iterator = self.parameters.get("ShardIterator")
limit = self.parameters.get("Limit")
next_shard_iterator, records = self.kinesis_backend.get_records(shard_iterator, limit)
return json.dumps({
"NextShardIterator": next_shard_iterator,
"Records": [record.to_json() for record in records]
})
def put_record(self):
stream_name = self.parameters.get("StreamName")
partition_key = self.parameters.get("PartitionKey")
explicit_hash_key = self.parameters.get("ExplicitHashKey")
sequence_number_for_ordering = self.parameters.get("SequenceNumberForOrdering")
data = self.parameters.get("Data")
sequence_number, shard_id = self.kinesis_backend.put_record(
stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data
)
return json.dumps({
"SequenceNumber": sequence_number,
"ShardId": shard_id,
})

10
moto/kinesis/urls.py Normal file
View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import KinesisResponse
url_bases = [
"https?://kinesis.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': KinesisResponse().dispatch,
}

31
moto/kinesis/utils.py Normal file
View File

@ -0,0 +1,31 @@
import base64
from .exceptions import InvalidArgumentError
def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number):
if shard_iterator_type == "AT_SEQUENCE_NUMBER":
last_sequence_id = int(starting_sequence_number) - 1
elif shard_iterator_type == "AFTER_SEQUENCE_NUMBER":
last_sequence_id = int(starting_sequence_number)
elif shard_iterator_type == "TRIM_HORIZON":
last_sequence_id = 0
elif shard_iterator_type == "LATEST":
last_sequence_id = shard.get_max_sequence_number()
else:
raise InvalidArgumentError("Invalid ShardIteratorType: {0}".format(shard_iterator_type))
return compose_shard_iterator(stream_name, shard, last_sequence_id)
def compose_shard_iterator(stream_name, shard, last_sequence_id):
return base64.encodestring(
"{0}:{1}:{2}".format(
stream_name,
shard.shard_id,
last_sequence_id,
).encode("utf-8")
).decode("utf-8")
def decompose_shard_iterator(shard_iterator):
return base64.decodestring(shard_iterator.encode("utf-8")).decode("utf-8").split(":")

View File

@ -0,0 +1,239 @@
from __future__ import unicode_literals
import boto.kinesis
from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException
import sure # noqa
from moto import mock_kinesis
@mock_kinesis
def test_create_cluster():
conn = boto.kinesis.connect_to_region("us-west-2")
conn.create_stream("my_stream", 2)
stream_response = conn.describe_stream("my_stream")
stream = stream_response["StreamDescription"]
stream["StreamName"].should.equal("my_stream")
stream["HasMoreShards"].should.equal(False)
stream["StreamARN"].should.equal("arn:aws:kinesis:us-west-2:123456789012:my_stream")
stream["StreamStatus"].should.equal("ACTIVE")
shards = stream['Shards']
shards.should.have.length_of(2)
@mock_kinesis
def test_describe_non_existant_stream():
conn = boto.kinesis.connect_to_region("us-east-1")
conn.describe_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException)
@mock_kinesis
def test_list_and_delete_stream():
conn = boto.kinesis.connect_to_region("us-west-2")
conn.create_stream("stream1", 1)
conn.create_stream("stream2", 1)
conn.list_streams()['StreamNames'].should.have.length_of(2)
conn.delete_stream("stream2")
conn.list_streams()['StreamNames'].should.have.length_of(1)
# Delete invalid id
conn.delete_stream.when.called_with("not-a-stream").should.throw(ResourceNotFoundException)
@mock_kinesis
def test_basic_shard_iterator():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
response = conn.describe_stream(stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON')
shard_iterator = response['ShardIterator']
response = conn.get_records(shard_iterator)
shard_iterator = response['NextShardIterator']
response['Records'].should.equal([])
@mock_kinesis
def test_get_invalid_shard_iterator():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
conn.get_shard_iterator.when.called_with(stream_name, "123", 'TRIM_HORIZON').should.throw(ResourceNotFoundException)
@mock_kinesis
def test_put_records():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
data = "hello world"
partition_key = "1234"
conn.put_record(stream_name, data, partition_key)
response = conn.describe_stream(stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON')
shard_iterator = response['ShardIterator']
response = conn.get_records(shard_iterator)
shard_iterator = response['NextShardIterator']
response['Records'].should.have.length_of(1)
record = response['Records'][0]
record["Data"].should.equal("hello world")
record["PartitionKey"].should.equal("1234")
record["SequenceNumber"].should.equal("1")
@mock_kinesis
def test_get_records_limit():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
data = "hello world"
for index in range(5):
conn.put_record(stream_name, data, index)
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON')
shard_iterator = response['ShardIterator']
# Retrieve only 3 records
response = conn.get_records(shard_iterator, limit=3)
response['Records'].should.have.length_of(3)
# Then get the rest of the results
next_shard_iterator = response['NextShardIterator']
response = conn.get_records(next_shard_iterator)
response['Records'].should.have.length_of(2)
@mock_kinesis
def test_get_records_at_sequence_number():
# AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by a specific sequence number.
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
for index in range(1, 5):
conn.put_record(stream_name, str(index), index)
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON')
shard_iterator = response['ShardIterator']
# Get the second record
response = conn.get_records(shard_iterator, limit=2)
second_sequence_id = response['Records'][1]['SequenceNumber']
# Then get a new iterator starting at that id
response = conn.get_shard_iterator(stream_name, shard_id, 'AT_SEQUENCE_NUMBER', second_sequence_id)
shard_iterator = response['ShardIterator']
response = conn.get_records(shard_iterator)
# And the first result returned should be the second item
response['Records'][0]['SequenceNumber'].should.equal(second_sequence_id)
response['Records'][0]['Data'].should.equal('2')
@mock_kinesis
def test_get_records_after_sequence_number():
# AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted by a specific sequence number.
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
for index in range(1, 5):
conn.put_record(stream_name, str(index), index)
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON')
shard_iterator = response['ShardIterator']
# Get the second record
response = conn.get_records(shard_iterator, limit=2)
second_sequence_id = response['Records'][1]['SequenceNumber']
# Then get a new iterator starting after that id
response = conn.get_shard_iterator(stream_name, shard_id, 'AFTER_SEQUENCE_NUMBER', second_sequence_id)
shard_iterator = response['ShardIterator']
response = conn.get_records(shard_iterator)
# And the first result returned should be the third item
response['Records'][0]['Data'].should.equal('3')
@mock_kinesis
def test_get_records_latest():
# LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
for index in range(1, 5):
conn.put_record(stream_name, str(index), index)
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator(stream_name, shard_id, 'TRIM_HORIZON')
shard_iterator = response['ShardIterator']
# Get the second record
response = conn.get_records(shard_iterator, limit=2)
second_sequence_id = response['Records'][1]['SequenceNumber']
# Then get a new iterator starting after that id
response = conn.get_shard_iterator(stream_name, shard_id, 'LATEST', second_sequence_id)
shard_iterator = response['ShardIterator']
# Write some more data
conn.put_record(stream_name, "last_record", "last_record")
response = conn.get_records(shard_iterator)
# And the only result returned should be the new item
response['Records'].should.have.length_of(1)
response['Records'][0]['PartitionKey'].should.equal('last_record')
response['Records'][0]['Data'].should.equal('last_record')
@mock_kinesis
def test_invalid_shard_iterator_type():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
response = conn.describe_stream(stream_name)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
response = conn.get_shard_iterator.when.called_with(
stream_name, shard_id, 'invalid-type').should.throw(InvalidArgumentException)

View File

@ -0,0 +1,25 @@
from __future__ import unicode_literals
import json
import sure # noqa
import moto.server as server
from moto import mock_kinesis
'''
Test the different server responses
'''
@mock_kinesis
def test_list_streams():
backend = server.create_backend_app("kinesis")
test_client = backend.test_client()
res = test_client.get('/?Action=ListStreams')
json_data = json.loads(res.data.decode("utf-8"))
json_data.should.equal({
"HasMoreStreams": False,
"StreamNames": [],
})