moto/tests/test_kinesis/test_kinesis.py

1101 lines
37 KiB
Python

import datetime
import time
import pytest
import boto.kinesis
import boto3
from boto.kinesis.exceptions import ResourceNotFoundException, InvalidArgumentException
from botocore.exceptions import ClientError
from dateutil.tz import tzlocal
from moto import mock_kinesis, mock_kinesis_deprecated
from moto.core import ACCOUNT_ID
import sure # noqa # pylint: disable=unused-import
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_create_cluster():
conn = boto.kinesis.connect_to_region("us-west-2")
conn.create_stream("my_stream", 3)
stream_response = conn.describe_stream("my_stream")
stream = stream_response["StreamDescription"]
stream["StreamName"].should.equal("my_stream")
stream["HasMoreShards"].should.equal(False)
stream["StreamARN"].should.equal(
"arn:aws:kinesis:us-west-2:{}:stream/my_stream".format(ACCOUNT_ID)
)
stream["StreamStatus"].should.equal("ACTIVE")
shards = stream["Shards"]
shards.should.have.length_of(3)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_describe_non_existent_stream():
conn = boto.kinesis.connect_to_region("us-east-1")
conn.describe_stream.when.called_with("not-a-stream").should.throw(
ResourceNotFoundException
)
@mock_kinesis
def test_describe_non_existent_stream_boto3():
client = boto3.client("kinesis", region_name="us-west-2")
with pytest.raises(ClientError) as exc:
client.describe_stream_summary(StreamName="not-a-stream")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
"Stream not-a-stream under account 123456789012 not found."
)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_list_and_delete_stream():
conn = boto.kinesis.connect_to_region("us-west-2")
conn.create_stream("stream1", 1)
conn.create_stream("stream2", 1)
conn.list_streams()["StreamNames"].should.have.length_of(2)
conn.delete_stream("stream2")
conn.list_streams()["StreamNames"].should.have.length_of(1)
# Delete invalid id
conn.delete_stream.when.called_with("not-a-stream").should.throw(
ResourceNotFoundException
)
@mock_kinesis
def test_list_and_delete_stream_boto3():
client = boto3.client("kinesis", region_name="us-west-2")
client.list_streams()["StreamNames"].should.have.length_of(0)
client.create_stream(StreamName="stream1", ShardCount=1)
client.create_stream(StreamName="stream2", ShardCount=1)
client.list_streams()["StreamNames"].should.have.length_of(2)
client.delete_stream(StreamName="stream1")
client.list_streams()["StreamNames"].should.have.length_of(1)
@mock_kinesis
def test_delete_unknown_stream():
client = boto3.client("kinesis", region_name="us-west-2")
with pytest.raises(ClientError) as exc:
client.delete_stream(StreamName="not-a-stream")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
"Stream not-a-stream under account 123456789012 not found."
)
@mock_kinesis
def test_list_many_streams():
conn = boto3.client("kinesis", region_name="us-west-2")
for i in range(11):
conn.create_stream(StreamName="stream%d" % i, ShardCount=1)
resp = conn.list_streams()
stream_names = resp["StreamNames"]
has_more_streams = resp["HasMoreStreams"]
stream_names.should.have.length_of(10)
has_more_streams.should.be(True)
resp2 = conn.list_streams(ExclusiveStartStreamName=stream_names[-1])
stream_names = resp2["StreamNames"]
has_more_streams = resp2["HasMoreStreams"]
stream_names.should.have.length_of(1)
has_more_streams.should.equal(False)
@mock_kinesis
def test_describe_stream_summary():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream_summary"
shard_count = 5
conn.create_stream(StreamName=stream_name, ShardCount=shard_count)
resp = conn.describe_stream_summary(StreamName=stream_name)
stream = resp["StreamDescriptionSummary"]
stream["StreamName"].should.equal(stream_name)
stream["OpenShardCount"].should.equal(shard_count)
stream["StreamARN"].should.equal(
"arn:aws:kinesis:us-west-2:{}:stream/{}".format(ACCOUNT_ID, stream_name)
)
stream["StreamStatus"].should.equal("ACTIVE")
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_basic_shard_iterator():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
response = conn.describe_stream(stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
shard_iterator = response["ShardIterator"]
response = conn.get_records(shard_iterator)
shard_iterator = response["NextShardIterator"]
response["Records"].should.equal([])
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_basic_shard_iterator_boto3():
client = boto3.client("kinesis", region_name="us-west-1")
stream_name = "mystream"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]
resp = client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = resp["ShardIterator"]
resp = client.get_records(ShardIterator=shard_iterator)
resp.should.have.key("Records").length_of(0)
resp.should.have.key("MillisBehindLatest").equal(0)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_get_invalid_shard_iterator():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
conn.get_shard_iterator.when.called_with(
stream_name, "123", "TRIM_HORIZON"
).should.throw(ResourceNotFoundException)
@mock_kinesis
def test_get_invalid_shard_iterator_boto3():
client = boto3.client("kinesis", region_name="us-west-1")
stream_name = "mystream"
client.create_stream(StreamName=stream_name, ShardCount=1)
with pytest.raises(ClientError) as exc:
client.get_shard_iterator(
StreamName=stream_name, ShardId="123", ShardIteratorType="TRIM_HORIZON"
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
# There is some magic in AWS, that '123' is automatically converted into 'shardId-000000000123'
# AWS itself returns this normalized ID in the error message, not the given id
err["Message"].should.equal(
f"Shard 123 in stream {stream_name} under account {ACCOUNT_ID} does not exist"
)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_put_records():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
data = "hello world"
partition_key = "1234"
conn.put_record.when.called_with(stream_name, data, 1234).should.throw(
InvalidArgumentException
)
response = conn.put_record(stream_name, data, partition_key)
response["SequenceNumber"].should.equal("1")
response = conn.describe_stream(stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
shard_iterator = response["ShardIterator"]
response = conn.get_records(shard_iterator)
shard_iterator = response["NextShardIterator"]
response["Records"].should.have.length_of(1)
record = response["Records"][0]
record["Data"].should.equal("hello world")
record["PartitionKey"].should.equal("1234")
record["SequenceNumber"].should.equal("1")
@mock_kinesis
def test_put_records_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]
data = b"hello world"
partition_key = "1234"
response = client.put_record(
StreamName=stream_name, Data=data, PartitionKey=partition_key
)
response["SequenceNumber"].should.equal("1")
resp = client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = resp["ShardIterator"]
resp = client.get_records(ShardIterator=shard_iterator)
resp["Records"].should.have.length_of(1)
record = resp["Records"][0]
record["Data"].should.equal(b"hello world")
record["PartitionKey"].should.equal("1234")
record["SequenceNumber"].should.equal("1")
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_get_records_limit():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
data = "hello world"
for index in range(5):
conn.put_record(stream_name, data, str(index))
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
shard_iterator = response["ShardIterator"]
# Retrieve only 3 records
response = conn.get_records(shard_iterator, limit=3)
response["Records"].should.have.length_of(3)
# Then get the rest of the results
next_shard_iterator = response["NextShardIterator"]
response = conn.get_records(next_shard_iterator)
response["Records"].should.have.length_of(2)
@mock_kinesis
def test_get_records_limit_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]
data = b"hello world"
for index in range(5):
client.put_record(StreamName=stream_name, Data=data, PartitionKey=str(index))
resp = client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = resp["ShardIterator"]
# Retrieve only 3 records
resp = client.get_records(ShardIterator=shard_iterator, Limit=3)
resp["Records"].should.have.length_of(3)
# Then get the rest of the results
next_shard_iterator = resp["NextShardIterator"]
response = client.get_records(ShardIterator=next_shard_iterator)
response["Records"].should.have.length_of(2)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_get_records_at_sequence_number():
# AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by
# a specific sequence number.
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
for index in range(1, 5):
conn.put_record(stream_name, str(index), str(index))
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
shard_iterator = response["ShardIterator"]
# Get the second record
response = conn.get_records(shard_iterator, limit=2)
second_sequence_id = response["Records"][1]["SequenceNumber"]
# Then get a new iterator starting at that id
response = conn.get_shard_iterator(
stream_name, shard_id, "AT_SEQUENCE_NUMBER", second_sequence_id
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(shard_iterator)
# And the first result returned should be the second item
response["Records"][0]["SequenceNumber"].should.equal(second_sequence_id)
response["Records"][0]["Data"].should.equal("2")
@mock_kinesis
def test_get_records_at_sequence_number_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]
for index in range(1, 5):
client.put_record(
StreamName=stream_name,
Data=f"data_{index}".encode("utf-8"),
PartitionKey=str(index),
)
resp = client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = resp["ShardIterator"]
# Retrieve only 2 records
resp = client.get_records(ShardIterator=shard_iterator, Limit=2)
sequence_nr = resp["Records"][1]["SequenceNumber"]
# Then get a new iterator starting at that id
resp = client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_SEQUENCE_NUMBER",
StartingSequenceNumber=sequence_nr,
)
shard_iterator = resp["ShardIterator"]
resp = client.get_records(ShardIterator=shard_iterator)
# And the first result returned should be the second item
resp["Records"][0]["SequenceNumber"].should.equal(sequence_nr)
resp["Records"][0]["Data"].should.equal(b"data_2")
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_get_records_after_sequence_number():
# AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted
# by a specific sequence number.
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
for index in range(1, 5):
conn.put_record(stream_name, str(index), str(index))
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
shard_iterator = response["ShardIterator"]
# Get the second record
response = conn.get_records(shard_iterator, limit=2)
second_sequence_id = response["Records"][1]["SequenceNumber"]
# Then get a new iterator starting after that id
response = conn.get_shard_iterator(
stream_name, shard_id, "AFTER_SEQUENCE_NUMBER", second_sequence_id
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(shard_iterator)
# And the first result returned should be the third item
response["Records"][0]["Data"].should.equal("3")
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_get_records_after_sequence_number_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]
for index in range(1, 5):
client.put_record(
StreamName=stream_name,
Data=f"data_{index}".encode("utf-8"),
PartitionKey=str(index),
)
resp = client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = resp["ShardIterator"]
# Retrieve only 2 records
resp = client.get_records(ShardIterator=shard_iterator, Limit=2)
sequence_nr = resp["Records"][1]["SequenceNumber"]
# Then get a new iterator starting at that id
resp = client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AFTER_SEQUENCE_NUMBER",
StartingSequenceNumber=sequence_nr,
)
shard_iterator = resp["ShardIterator"]
resp = client.get_records(ShardIterator=shard_iterator)
# And the first result returned should be the second item
resp["Records"][0]["SequenceNumber"].should.equal("3")
resp["Records"][0]["Data"].should.equal(b"data_3")
resp["MillisBehindLatest"].should.equal(0)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_get_records_latest():
# LATEST - Start reading just after the most recent record in the shard,
# so that you always read the most recent data in the shard.
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
# Create some data
for index in range(1, 5):
conn.put_record(stream_name, str(index), str(index))
# Get a shard iterator
response = conn.describe_stream(stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")
shard_iterator = response["ShardIterator"]
# Get the second record
response = conn.get_records(shard_iterator, limit=2)
second_sequence_id = response["Records"][1]["SequenceNumber"]
# Then get a new iterator starting after that id
response = conn.get_shard_iterator(
stream_name, shard_id, "LATEST", second_sequence_id
)
shard_iterator = response["ShardIterator"]
# Write some more data
conn.put_record(stream_name, "last_record", "last_record")
response = conn.get_records(shard_iterator)
# And the only result returned should be the new item
response["Records"].should.have.length_of(1)
response["Records"][0]["PartitionKey"].should.equal("last_record")
response["Records"][0]["Data"].should.equal("last_record")
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_get_records_latest_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]
for index in range(1, 5):
client.put_record(
StreamName=stream_name,
Data=f"data_{index}".encode("utf-8"),
PartitionKey=str(index),
)
resp = client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = resp["ShardIterator"]
# Retrieve only 2 records
resp = client.get_records(ShardIterator=shard_iterator, Limit=2)
sequence_nr = resp["Records"][1]["SequenceNumber"]
# Then get a new iterator starting at that id
resp = client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="LATEST",
StartingSequenceNumber=sequence_nr,
)
shard_iterator = resp["ShardIterator"]
client.put_record(
StreamName=stream_name, Data=b"last_record", PartitionKey="last_record"
)
resp = client.get_records(ShardIterator=shard_iterator)
# And the first result returned should be the second item
resp["Records"].should.have.length_of(1)
resp["Records"][0]["SequenceNumber"].should.equal("5")
resp["Records"][0]["PartitionKey"].should.equal("last_record")
resp["Records"][0]["Data"].should.equal(b"last_record")
resp["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_get_records_at_timestamp():
# AT_TIMESTAMP - Read the first record at or after the specified timestamp
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
# Create some data
for index in range(1, 5):
conn.put_record(
StreamName=stream_name, Data=str(index), PartitionKey=str(index)
)
# When boto3 floors the timestamp that we pass to get_shard_iterator to
# second precision even though AWS supports ms precision:
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
# To test around this limitation we wait until we well into the next second
# before capturing the time and storing the records we expect to retrieve.
time.sleep(1.0)
timestamp = datetime.datetime.utcnow()
keys = [str(i) for i in range(5, 10)]
for k in keys:
conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_TIMESTAMP",
Timestamp=timestamp,
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(len(keys))
partition_keys = [r["PartitionKey"] for r in response["Records"]]
partition_keys.should.equal(keys)
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_get_records_at_very_old_timestamp():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
# Create some data
keys = [str(i) for i in range(1, 5)]
for k in keys:
conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_TIMESTAMP",
Timestamp=1,
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(len(keys))
partition_keys = [r["PartitionKey"] for r in response["Records"]]
partition_keys.should.equal(keys)
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_get_records_timestamp_filtering():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")
time.sleep(1.0)
timestamp = datetime.datetime.now(tz=tzlocal())
conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")
response = conn.describe_stream(StreamName=stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_TIMESTAMP",
Timestamp=timestamp,
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(1)
response["Records"][0]["PartitionKey"].should.equal("1")
response["Records"][0]["ApproximateArrivalTimestamp"].should.be.greater_than(
timestamp
)
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_get_records_millis_behind_latest():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")
time.sleep(1.0)
conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")
response = conn.describe_stream(StreamName=stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator, Limit=1)
response["Records"].should.have.length_of(1)
response["MillisBehindLatest"].should.be.greater_than(0)
@mock_kinesis
def test_get_records_at_very_new_timestamp():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
# Create some data
keys = [str(i) for i in range(1, 5)]
for k in keys:
conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)
timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=1)
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_TIMESTAMP",
Timestamp=timestamp,
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(0)
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_get_records_from_empty_stream_at_timestamp():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
timestamp = datetime.datetime.utcnow()
# Get a shard iterator
response = conn.describe_stream(StreamName=stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="AT_TIMESTAMP",
Timestamp=timestamp,
)
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(0)
response["MillisBehindLatest"].should.equal(0)
@mock_kinesis
def test_valid_increase_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=40
)
response = conn.describe_stream(StreamName=stream_name)
response["StreamDescription"]["RetentionPeriodHours"].should.equal(40)
@mock_kinesis
def test_invalid_increase_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "my_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=30
)
with pytest.raises(ClientError) as ex:
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=20
)
ex.value.response["Error"]["Code"].should.equal("InvalidArgumentException")
ex.value.response["Error"]["Message"].should.equal(20)
@mock_kinesis
def test_valid_decrease_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "decrease_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=30
)
conn.decrease_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=25
)
response = conn.describe_stream(StreamName=stream_name)
response["StreamDescription"]["RetentionPeriodHours"].should.equal(25)
@mock_kinesis
def test_invalid_decrease_stream_retention_period():
conn = boto3.client("kinesis", region_name="us-west-2")
stream_name = "decrease_stream"
conn.create_stream(StreamName=stream_name, ShardCount=1)
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=30
)
with pytest.raises(ClientError) as ex:
conn.decrease_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=20
)
ex.value.response["Error"]["Code"].should.equal("InvalidArgumentException")
ex.value.response["Error"]["Message"].should.equal(20)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_invalid_shard_iterator_type():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
response = conn.describe_stream(stream_name)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = conn.get_shard_iterator.when.called_with(
stream_name, shard_id, "invalid-type"
).should.throw(InvalidArgumentException)
@mock_kinesis
def test_invalid_shard_iterator_type_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=1)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shard_id = stream["Shards"][0]["ShardId"]
with pytest.raises(ClientError) as exc:
client.get_shard_iterator(
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="invalid-type"
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal("Invalid ShardIteratorType: invalid-type")
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_add_tags():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
conn.describe_stream(stream_name)
conn.add_tags_to_stream(stream_name, {"tag1": "val1"})
conn.add_tags_to_stream(stream_name, {"tag2": "val2"})
conn.add_tags_to_stream(stream_name, {"tag1": "val3"})
conn.add_tags_to_stream(stream_name, {"tag2": "val4"})
@mock_kinesis
def test_add_list_remove_tags_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=1)
client.add_tags_to_stream(
StreamName=stream_name,
Tags={"tag1": "val1", "tag2": "val2", "tag3": "val3", "tag4": "val4",},
)
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
tags.should.have.length_of(4)
tags.should.contain({"Key": "tag1", "Value": "val1"})
tags.should.contain({"Key": "tag2", "Value": "val2"})
tags.should.contain({"Key": "tag3", "Value": "val3"})
tags.should.contain({"Key": "tag4", "Value": "val4"})
client.add_tags_to_stream(StreamName=stream_name, Tags={"tag5": "val5"})
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
tags.should.have.length_of(5)
tags.should.contain({"Key": "tag5", "Value": "val5"})
client.remove_tags_from_stream(StreamName=stream_name, TagKeys=["tag2", "tag3"])
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
tags.should.have.length_of(3)
tags.should.contain({"Key": "tag1", "Value": "val1"})
tags.should.contain({"Key": "tag4", "Value": "val4"})
tags.should.contain({"Key": "tag5", "Value": "val5"})
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_list_tags():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
conn.describe_stream(stream_name)
conn.add_tags_to_stream(stream_name, {"tag1": "val1"})
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag1").should.equal("val1")
conn.add_tags_to_stream(stream_name, {"tag2": "val2"})
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag2").should.equal("val2")
conn.add_tags_to_stream(stream_name, {"tag1": "val3"})
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag1").should.equal("val3")
conn.add_tags_to_stream(stream_name, {"tag2": "val4"})
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag2").should.equal("val4")
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_remove_tags():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 1)
conn.describe_stream(stream_name)
conn.add_tags_to_stream(stream_name, {"tag1": "val1"})
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag1").should.equal("val1")
conn.remove_tags_from_stream(stream_name, ["tag1"])
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag1").should.equal(None)
conn.add_tags_to_stream(stream_name, {"tag2": "val2"})
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag2").should.equal("val2")
conn.remove_tags_from_stream(stream_name, ["tag2"])
tags = dict(
[
(tag["Key"], tag["Value"])
for tag in conn.list_tags_for_stream(stream_name)["Tags"]
]
)
tags.get("tag2").should.equal(None)
# Has boto3 equivalent
@mock_kinesis_deprecated
def test_merge_shards():
conn = boto.kinesis.connect_to_region("us-west-2")
stream_name = "my_stream"
conn.create_stream(stream_name, 4)
# Create some data
for index in range(1, 100):
conn.put_record(stream_name, str(index), str(index))
stream_response = conn.describe_stream(stream_name)
stream = stream_response["StreamDescription"]
shards = stream["Shards"]
shards.should.have.length_of(4)
conn.merge_shards.when.called_with(
stream_name, "shardId-000000000000", "shardId-000000000002"
).should.throw(InvalidArgumentException)
stream_response = conn.describe_stream(stream_name)
stream = stream_response["StreamDescription"]
shards = stream["Shards"]
shards.should.have.length_of(4)
conn.merge_shards(stream_name, "shardId-000000000000", "shardId-000000000001")
stream_response = conn.describe_stream(stream_name)
stream = stream_response["StreamDescription"]
shards = stream["Shards"]
active_shards = [
shard
for shard in shards
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
active_shards.should.have.length_of(3)
conn.merge_shards(stream_name, "shardId-000000000002", "shardId-000000000000")
stream_response = conn.describe_stream(stream_name)
stream = stream_response["StreamDescription"]
shards = stream["Shards"]
active_shards = [
shard
for shard in shards
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
active_shards.should.have.length_of(2)
@mock_kinesis
def test_merge_shards_boto3():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=4)
for index in range(1, 100):
client.put_record(
StreamName=stream_name,
Data=f"data_{index}".encode("utf-8"),
PartitionKey=str(index),
)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shards = stream["Shards"]
shards.should.have.length_of(4)
client.merge_shards(
StreamName=stream_name,
ShardToMerge="shardId-000000000000",
AdjacentShardToMerge="shardId-000000000001",
)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shards = stream["Shards"]
active_shards = [
shard
for shard in shards
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
active_shards.should.have.length_of(3)
client.merge_shards(
StreamName=stream_name,
ShardToMerge="shardId-000000000002",
AdjacentShardToMerge="shardId-000000000000",
)
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shards = stream["Shards"]
active_shards = [
shard
for shard in shards
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
active_shards.should.have.length_of(2)
@mock_kinesis
def test_merge_shards_invalid_arg():
client = boto3.client("kinesis", region_name="eu-west-2")
stream_name = "my_stream_summary"
client.create_stream(StreamName=stream_name, ShardCount=4)
with pytest.raises(ClientError) as exc:
client.merge_shards(
StreamName=stream_name,
ShardToMerge="shardId-000000000000",
AdjacentShardToMerge="shardId-000000000002",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal("shardId-000000000002")