moto/moto/kinesis/models.py

540 lines
18 KiB
Python
Raw Normal View History

2014-11-26 15:55:58 +00:00
from __future__ import unicode_literals
2015-10-30 13:59:57 +00:00
import datetime
import time
import re
2015-12-05 11:27:16 +00:00
import six
import itertools
from operator import attrgetter
from hashlib import md5
2019-12-26 16:12:22 +00:00
from boto3 import Session
2014-11-30 03:43:30 +00:00
from moto.compat import OrderedDict
2017-03-12 04:41:12 +00:00
from moto.core import BaseBackend, BaseModel
from moto.core.utils import unix_time
from moto.core import ACCOUNT_ID
2019-10-31 15:44:26 +00:00
from .exceptions import (
StreamNotFoundError,
ShardNotFoundError,
ResourceInUseError,
ResourceNotFoundError,
InvalidArgumentError,
)
from .utils import (
compose_shard_iterator,
compose_new_shard_iterator,
decompose_shard_iterator,
)
2014-11-27 01:49:21 +00:00
2017-03-12 04:41:12 +00:00
class Record(BaseModel):
def __init__(self, partition_key, data, sequence_number, explicit_hash_key):
2014-11-27 01:49:21 +00:00
self.partition_key = partition_key
self.data = data
self.sequence_number = sequence_number
self.explicit_hash_key = explicit_hash_key
self.created_at_datetime = datetime.datetime.utcnow()
self.created_at = unix_time(self.created_at_datetime)
2014-11-27 01:49:21 +00:00
def to_json(self):
return {
"Data": self.data,
"PartitionKey": self.partition_key,
"SequenceNumber": str(self.sequence_number),
2019-10-31 15:44:26 +00:00
"ApproximateArrivalTimestamp": self.created_at_datetime.isoformat(),
2014-11-27 01:49:21 +00:00
}
2017-03-12 04:41:12 +00:00
class Shard(BaseModel):
def __init__(self, shard_id, starting_hash, ending_hash):
self._shard_id = shard_id
self.starting_hash = starting_hash
self.ending_hash = ending_hash
2014-11-27 01:49:21 +00:00
self.records = OrderedDict()
@property
def shard_id(self):
return "shardId-{0}".format(str(self._shard_id).zfill(12))
2014-11-27 01:49:21 +00:00
def get_records(self, last_sequence_id, limit):
last_sequence_id = int(last_sequence_id)
results = []
secs_behind_latest = 0
2014-11-27 01:49:21 +00:00
for sequence_number, record in self.records.items():
if sequence_number > last_sequence_id:
results.append(record)
last_sequence_id = sequence_number
very_last_record = self.records[next(reversed(self.records))]
secs_behind_latest = very_last_record.created_at - record.created_at
2014-11-27 01:49:21 +00:00
if len(results) == limit:
break
millis_behind_latest = int(secs_behind_latest * 1000)
return results, last_sequence_id, millis_behind_latest
2014-11-27 01:49:21 +00:00
def put_record(self, partition_key, data, explicit_hash_key):
2014-11-27 01:49:21 +00:00
# Note: this function is not safe for concurrency
if self.records:
last_sequence_number = self.get_max_sequence_number()
else:
last_sequence_number = 0
sequence_number = last_sequence_number + 1
2017-02-24 02:37:43 +00:00
self.records[sequence_number] = Record(
2019-10-31 15:44:26 +00:00
partition_key, data, sequence_number, explicit_hash_key
)
2014-11-27 01:49:21 +00:00
return sequence_number
2014-11-27 02:55:01 +00:00
def get_min_sequence_number(self):
if self.records:
return list(self.records.keys())[0]
return 0
2014-11-27 01:49:21 +00:00
def get_max_sequence_number(self):
2014-11-27 02:55:01 +00:00
if self.records:
return list(self.records.keys())[-1]
return 0
2014-11-27 01:49:21 +00:00
def get_sequence_number_at(self, at_timestamp):
if not self.records or at_timestamp < list(self.records.values())[0].created_at:
return 0
else:
# find the last item in the list that was created before
# at_timestamp
2019-10-31 15:44:26 +00:00
r = next(
(
r
for r in reversed(self.records.values())
if r.created_at < at_timestamp
),
None,
)
return r.sequence_number
2014-11-27 01:49:21 +00:00
def to_json(self):
return {
"HashKeyRange": {
"EndingHashKey": str(self.ending_hash),
2019-10-31 15:44:26 +00:00
"StartingHashKey": str(self.starting_hash),
2014-11-27 01:49:21 +00:00
},
"SequenceNumberRange": {
2014-11-27 02:55:01 +00:00
"EndingSequenceNumber": self.get_max_sequence_number(),
"StartingSequenceNumber": self.get_min_sequence_number(),
2014-11-27 01:49:21 +00:00
},
2019-10-31 15:44:26 +00:00
"ShardId": self.shard_id,
2014-11-27 01:49:21 +00:00
}
2014-11-26 15:55:58 +00:00
2017-03-12 04:41:12 +00:00
class Stream(BaseModel):
2014-11-26 15:55:58 +00:00
def __init__(self, stream_name, shard_count, region):
self.stream_name = stream_name
self.shard_count = shard_count
self.creation_datetime = datetime.datetime.now()
2014-11-26 15:55:58 +00:00
self.region = region
self.account_number = ACCOUNT_ID
2014-11-27 01:49:21 +00:00
self.shards = {}
self.tags = {}
self.status = "ACTIVE"
2014-11-27 01:49:21 +00:00
2019-10-31 15:44:26 +00:00
step = 2 ** 128 // shard_count
hash_ranges = itertools.chain(
map(lambda i: (i, i * step, (i + 1) * step), range(shard_count - 1)),
[(shard_count - 1, (shard_count - 1) * step, 2 ** 128)],
)
2019-03-08 16:03:56 +00:00
for index, start, end in hash_ranges:
shard = Shard(index, start, end)
self.shards[shard.shard_id] = shard
2014-11-26 15:55:58 +00:00
@property
def arn(self):
return "arn:aws:kinesis:{region}:{account_number}:{stream_name}".format(
region=self.region,
account_number=self.account_number,
2019-10-31 15:44:26 +00:00
stream_name=self.stream_name,
2014-11-26 15:55:58 +00:00
)
2014-11-27 01:49:21 +00:00
def get_shard(self, shard_id):
if shard_id in self.shards:
return self.shards[shard_id]
else:
raise ShardNotFoundError(shard_id)
def get_shard_for_key(self, partition_key, explicit_hash_key):
if not isinstance(partition_key, six.string_types):
raise InvalidArgumentError("partition_key")
if len(partition_key) > 256:
raise InvalidArgumentError("partition_key")
if explicit_hash_key:
if not isinstance(explicit_hash_key, six.string_types):
raise InvalidArgumentError("explicit_hash_key")
key = int(explicit_hash_key)
2019-10-31 15:44:26 +00:00
if key >= 2 ** 128:
raise InvalidArgumentError("explicit_hash_key")
else:
2019-10-31 15:44:26 +00:00
key = int(md5(partition_key.encode("utf-8")).hexdigest(), 16)
for shard in self.shards.values():
if shard.starting_hash <= key < shard.ending_hash:
return shard
2014-11-27 01:49:21 +00:00
2019-10-31 15:44:26 +00:00
def put_record(
self, partition_key, explicit_hash_key, sequence_number_for_ordering, data
):
shard = self.get_shard_for_key(partition_key, explicit_hash_key)
2014-11-27 01:49:21 +00:00
2019-10-31 15:44:26 +00:00
sequence_number = shard.put_record(partition_key, data, explicit_hash_key)
2014-11-27 01:49:21 +00:00
return sequence_number, shard.shard_id
2014-11-26 15:55:58 +00:00
def to_json(self):
return {
"StreamDescription": {
"StreamARN": self.arn,
"StreamName": self.stream_name,
"StreamStatus": self.status,
2014-11-26 15:55:58 +00:00
"HasMoreShards": False,
2014-11-27 01:49:21 +00:00
"Shards": [shard.to_json() for shard in self.shards.values()],
2014-11-26 15:55:58 +00:00
}
}
def to_json_summary(self):
return {
"StreamDescriptionSummary": {
"StreamARN": self.arn,
"StreamName": self.stream_name,
"StreamStatus": self.status,
"StreamCreationTimestamp": six.text_type(self.creation_datetime),
"OpenShardCount": self.shard_count,
}
}
@classmethod
2019-10-31 15:44:26 +00:00
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
region = properties.get("Region", "us-east-1")
shard_count = properties.get("ShardCount", 1)
return Stream(properties["Name"], shard_count, region)
2014-11-26 15:55:58 +00:00
2017-03-12 04:41:12 +00:00
class FirehoseRecord(BaseModel):
2015-10-30 13:59:57 +00:00
def __init__(self, record_data):
self.record_id = 12345678
self.record_data = record_data
2017-03-12 04:41:12 +00:00
class DeliveryStream(BaseModel):
2015-10-30 13:59:57 +00:00
def __init__(self, stream_name, **stream_kwargs):
self.name = stream_name
2019-10-31 15:44:26 +00:00
self.redshift_username = stream_kwargs.get("redshift_username")
self.redshift_password = stream_kwargs.get("redshift_password")
self.redshift_jdbc_url = stream_kwargs.get("redshift_jdbc_url")
self.redshift_role_arn = stream_kwargs.get("redshift_role_arn")
self.redshift_copy_command = stream_kwargs.get("redshift_copy_command")
self.s3_config = stream_kwargs.get("s3_config")
self.extended_s3_config = stream_kwargs.get("extended_s3_config")
self.redshift_s3_role_arn = stream_kwargs.get("redshift_s3_role_arn")
self.redshift_s3_bucket_arn = stream_kwargs.get("redshift_s3_bucket_arn")
self.redshift_s3_prefix = stream_kwargs.get("redshift_s3_prefix")
2017-02-24 02:37:43 +00:00
self.redshift_s3_compression_format = stream_kwargs.get(
2019-10-31 15:44:26 +00:00
"redshift_s3_compression_format", "UNCOMPRESSED"
)
self.redshift_s3_buffering_hints = stream_kwargs.get(
2019-10-31 15:44:26 +00:00
"redshift_s3_buffering_hints"
)
2015-10-30 13:59:57 +00:00
self.records = []
2019-10-31 15:44:26 +00:00
self.status = "ACTIVE"
self.created_at = datetime.datetime.utcnow()
2015-10-30 13:59:57 +00:00
self.last_updated = datetime.datetime.utcnow()
@property
def arn(self):
return "arn:aws:firehose:us-east-1:{1}:deliverystream/{0}".format(
self.name, ACCOUNT_ID
2019-10-31 15:44:26 +00:00
)
2015-10-30 13:59:57 +00:00
def destinations_to_dict(self):
if self.s3_config:
2019-10-31 15:44:26 +00:00
return [
{"DestinationId": "string", "S3DestinationDescription": self.s3_config}
]
elif self.extended_s3_config:
2019-10-31 15:44:26 +00:00
return [
{
"DestinationId": "string",
"ExtendedS3DestinationDescription": self.extended_s3_config,
}
]
else:
2019-10-31 15:44:26 +00:00
return [
{
"DestinationId": "string",
"RedshiftDestinationDescription": {
"ClusterJDBCURL": self.redshift_jdbc_url,
"CopyCommand": self.redshift_copy_command,
"RoleARN": self.redshift_role_arn,
"S3DestinationDescription": {
"BucketARN": self.redshift_s3_bucket_arn,
"BufferingHints": self.redshift_s3_buffering_hints,
"CompressionFormat": self.redshift_s3_compression_format,
"Prefix": self.redshift_s3_prefix,
2019-10-31 15:44:26 +00:00
"RoleARN": self.redshift_s3_role_arn,
},
"Username": self.redshift_username,
},
2019-10-31 15:44:26 +00:00
}
]
2015-10-30 13:59:57 +00:00
def to_dict(self):
return {
"DeliveryStreamDescription": {
"CreateTimestamp": time.mktime(self.created_at.timetuple()),
2015-10-30 13:59:57 +00:00
"DeliveryStreamARN": self.arn,
"DeliveryStreamName": self.name,
"DeliveryStreamStatus": self.status,
"Destinations": self.destinations_to_dict(),
2015-10-30 13:59:57 +00:00
"HasMoreDestinations": False,
"LastUpdateTimestamp": time.mktime(self.last_updated.timetuple()),
"VersionId": "string",
}
}
def put_record(self, record_data):
record = FirehoseRecord(record_data)
self.records.append(record)
return record
2014-11-26 15:55:58 +00:00
class KinesisBackend(BaseBackend):
def __init__(self):
2017-05-11 01:58:42 +00:00
self.streams = OrderedDict()
2015-10-30 13:59:57 +00:00
self.delivery_streams = {}
2014-11-26 15:55:58 +00:00
def create_stream(self, stream_name, shard_count, region):
if stream_name in self.streams:
2017-02-24 02:37:43 +00:00
raise ResourceInUseError(stream_name)
2014-11-26 15:55:58 +00:00
stream = Stream(stream_name, shard_count, region)
self.streams[stream_name] = stream
return stream
def describe_stream(self, stream_name):
if stream_name in self.streams:
return self.streams[stream_name]
else:
raise StreamNotFoundError(stream_name)
def describe_stream_summary(self, stream_name):
return self.describe_stream(stream_name)
2014-11-26 15:55:58 +00:00
def list_streams(self):
return self.streams.values()
def delete_stream(self, stream_name):
if stream_name in self.streams:
return self.streams.pop(stream_name)
raise StreamNotFoundError(stream_name)
2019-10-31 15:44:26 +00:00
def get_shard_iterator(
self,
stream_name,
shard_id,
shard_iterator_type,
starting_sequence_number,
at_timestamp,
):
2014-11-27 01:49:21 +00:00
# Validate params
stream = self.describe_stream(stream_name)
shard = stream.get_shard(shard_id)
shard_iterator = compose_new_shard_iterator(
2019-10-31 15:44:26 +00:00
stream_name,
shard,
shard_iterator_type,
starting_sequence_number,
at_timestamp,
2014-11-27 01:49:21 +00:00
)
return shard_iterator
def get_records(self, shard_iterator, limit):
decomposed = decompose_shard_iterator(shard_iterator)
stream_name, shard_id, last_sequence_id = decomposed
stream = self.describe_stream(stream_name)
shard = stream.get_shard(shard_id)
2019-10-31 15:44:26 +00:00
records, last_sequence_id, millis_behind_latest = shard.get_records(
last_sequence_id, limit
)
2014-11-27 01:49:21 +00:00
2017-02-24 02:37:43 +00:00
next_shard_iterator = compose_shard_iterator(
2019-10-31 15:44:26 +00:00
stream_name, shard, last_sequence_id
)
2014-11-27 01:49:21 +00:00
return next_shard_iterator, records, millis_behind_latest
2014-11-27 01:49:21 +00:00
2019-10-31 15:44:26 +00:00
def put_record(
self,
stream_name,
partition_key,
explicit_hash_key,
sequence_number_for_ordering,
data,
):
2014-11-27 01:49:21 +00:00
stream = self.describe_stream(stream_name)
sequence_number, shard_id = stream.put_record(
partition_key, explicit_hash_key, sequence_number_for_ordering, data
)
return sequence_number, shard_id
2014-11-26 15:55:58 +00:00
def put_records(self, stream_name, records):
stream = self.describe_stream(stream_name)
2019-10-31 15:44:26 +00:00
response = {"FailedRecordCount": 0, "Records": []}
for record in records:
partition_key = record.get("PartitionKey")
explicit_hash_key = record.get("ExplicitHashKey")
2015-11-17 17:18:02 +00:00
data = record.get("Data")
sequence_number, shard_id = stream.put_record(
partition_key, explicit_hash_key, None, data
)
2019-10-31 15:44:26 +00:00
response["Records"].append(
{"SequenceNumber": sequence_number, "ShardId": shard_id}
)
return response
def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
stream = self.describe_stream(stream_name)
if shard_to_split not in stream.shards:
2017-02-24 02:37:43 +00:00
raise ResourceNotFoundError(shard_to_split)
2019-10-31 15:44:26 +00:00
if not re.match(r"0|([1-9]\d{0,38})", new_starting_hash_key):
raise InvalidArgumentError(new_starting_hash_key)
new_starting_hash_key = int(new_starting_hash_key)
shard = stream.shards[shard_to_split]
2019-10-31 15:44:26 +00:00
last_id = sorted(stream.shards.values(), key=attrgetter("_shard_id"))[
-1
]._shard_id
if shard.starting_hash < new_starting_hash_key < shard.ending_hash:
2019-10-31 15:44:26 +00:00
new_shard = Shard(last_id + 1, new_starting_hash_key, shard.ending_hash)
shard.ending_hash = new_starting_hash_key
stream.shards[new_shard.shard_id] = new_shard
else:
raise InvalidArgumentError(new_starting_hash_key)
records = shard.records
shard.records = OrderedDict()
for index in records:
record = records[index]
stream.put_record(
record.partition_key, record.explicit_hash_key, None, record.data
)
def merge_shards(self, stream_name, shard_to_merge, adjacent_shard_to_merge):
stream = self.describe_stream(stream_name)
if shard_to_merge not in stream.shards:
2017-02-24 02:37:43 +00:00
raise ResourceNotFoundError(shard_to_merge)
if adjacent_shard_to_merge not in stream.shards:
2017-02-24 02:37:43 +00:00
raise ResourceNotFoundError(adjacent_shard_to_merge)
shard1 = stream.shards[shard_to_merge]
shard2 = stream.shards[adjacent_shard_to_merge]
if shard1.ending_hash == shard2.starting_hash:
shard1.ending_hash = shard2.ending_hash
elif shard2.ending_hash == shard1.starting_hash:
shard1.starting_hash = shard2.starting_hash
else:
raise InvalidArgumentError(adjacent_shard_to_merge)
del stream.shards[shard2.shard_id]
for index in shard2.records:
record = shard2.records[index]
2019-10-31 15:44:26 +00:00
shard1.put_record(
record.partition_key, record.data, record.explicit_hash_key
)
2019-10-31 15:44:26 +00:00
""" Firehose """
2017-02-24 02:37:43 +00:00
2015-10-30 13:59:57 +00:00
def create_delivery_stream(self, stream_name, **stream_kwargs):
stream = DeliveryStream(stream_name, **stream_kwargs)
self.delivery_streams[stream_name] = stream
return stream
def get_delivery_stream(self, stream_name):
if stream_name in self.delivery_streams:
return self.delivery_streams[stream_name]
else:
raise StreamNotFoundError(stream_name)
2015-10-30 13:59:57 +00:00
def list_delivery_streams(self):
return self.delivery_streams.values()
def delete_delivery_stream(self, stream_name):
self.delivery_streams.pop(stream_name)
def put_firehose_record(self, stream_name, record_data):
stream = self.get_delivery_stream(stream_name)
record = stream.put_record(record_data)
return record
2019-10-31 15:44:26 +00:00
def list_tags_for_stream(
self, stream_name, exclusive_start_tag_key=None, limit=None
):
2017-02-24 02:37:43 +00:00
stream = self.describe_stream(stream_name)
tags = []
2019-10-31 15:44:26 +00:00
result = {"HasMoreTags": False, "Tags": tags}
2017-02-24 02:37:43 +00:00
for key, val in sorted(stream.tags.items(), key=lambda x: x[0]):
if limit and len(tags) >= limit:
2019-10-31 15:44:26 +00:00
result["HasMoreTags"] = True
2017-02-24 02:37:43 +00:00
break
if exclusive_start_tag_key and key < exclusive_start_tag_key:
continue
2019-10-31 15:44:26 +00:00
tags.append({"Key": key, "Value": val})
return result
def add_tags_to_stream(self, stream_name, tags):
2017-02-24 02:37:43 +00:00
stream = self.describe_stream(stream_name)
stream.tags.update(tags)
def remove_tags_from_stream(self, stream_name, tag_keys):
2017-02-24 02:37:43 +00:00
stream = self.describe_stream(stream_name)
for key in tag_keys:
if key in stream.tags:
2017-02-24 02:37:43 +00:00
del stream.tags[key]
2014-11-26 15:55:58 +00:00
kinesis_backends = {}
2019-12-26 16:12:22 +00:00
for region in Session().get_available_regions("kinesis"):
kinesis_backends[region] = KinesisBackend()
for region in Session().get_available_regions("kinesis", partition_name="aws-us-gov"):
kinesis_backends[region] = KinesisBackend()
for region in Session().get_available_regions("kinesis", partition_name="aws-cn"):
kinesis_backends[region] = KinesisBackend()