Kinesis - duplicate deprecated tests (#4371)
This commit is contained in:
parent
b49ee71a55
commit
d8f3d3f3e1
@ -17,6 +17,7 @@ from moto.core import ACCOUNT_ID
|
|||||||
import sure # noqa
|
import sure # noqa
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_create_cluster():
|
def test_create_cluster():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -37,6 +38,7 @@ def test_create_cluster():
|
|||||||
shards.should.have.length_of(3)
|
shards.should.have.length_of(3)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_describe_non_existent_stream():
|
def test_describe_non_existent_stream():
|
||||||
conn = boto.kinesis.connect_to_region("us-east-1")
|
conn = boto.kinesis.connect_to_region("us-east-1")
|
||||||
@ -45,6 +47,19 @@ def test_describe_non_existent_stream():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_describe_non_existent_stream_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.describe_stream_summary(StreamName="not-a-stream")
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ResourceNotFoundException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
"Stream not-a-stream under account 123456789012 not found."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_list_and_delete_stream():
|
def test_list_and_delete_stream():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -64,6 +79,31 @@ def test_list_and_delete_stream():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_list_and_delete_stream_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
client.list_streams()["StreamNames"].should.have.length_of(0)
|
||||||
|
|
||||||
|
client.create_stream(StreamName="stream1", ShardCount=1)
|
||||||
|
client.create_stream(StreamName="stream2", ShardCount=1)
|
||||||
|
client.list_streams()["StreamNames"].should.have.length_of(2)
|
||||||
|
|
||||||
|
client.delete_stream(StreamName="stream1")
|
||||||
|
client.list_streams()["StreamNames"].should.have.length_of(1)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_delete_unknown_stream():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.delete_stream(StreamName="not-a-stream")
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ResourceNotFoundException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
"Stream not-a-stream under account 123456789012 not found."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
@mock_kinesis
|
||||||
def test_list_many_streams():
|
def test_list_many_streams():
|
||||||
conn = boto3.client("kinesis", region_name="us-west-2")
|
conn = boto3.client("kinesis", region_name="us-west-2")
|
||||||
@ -101,6 +141,7 @@ def test_describe_stream_summary():
|
|||||||
stream["StreamStatus"].should.equal("ACTIVE")
|
stream["StreamStatus"].should.equal("ACTIVE")
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_basic_shard_iterator():
|
def test_basic_shard_iterator():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -120,6 +161,26 @@ def test_basic_shard_iterator():
|
|||||||
response["MillisBehindLatest"].should.equal(0)
|
response["MillisBehindLatest"].should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_basic_shard_iterator_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-1")
|
||||||
|
|
||||||
|
stream_name = "mystream"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shard_id = stream["Shards"][0]["ShardId"]
|
||||||
|
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator)
|
||||||
|
resp.should.have.key("Records").length_of(0)
|
||||||
|
resp.should.have.key("MillisBehindLatest").equal(0)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_get_invalid_shard_iterator():
|
def test_get_invalid_shard_iterator():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -132,6 +193,23 @@ def test_get_invalid_shard_iterator():
|
|||||||
).should.throw(ResourceNotFoundException)
|
).should.throw(ResourceNotFoundException)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_get_invalid_shard_iterator_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-1")
|
||||||
|
|
||||||
|
stream_name = "mystream"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId="123", ShardIteratorType="TRIM_HORIZON"
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ResourceNotFoundException")
|
||||||
|
err["Message"].should.equal("Shard 123 under account 123456789012 not found.")
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_put_records():
|
def test_put_records():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -164,6 +242,35 @@ def test_put_records():
|
|||||||
record["SequenceNumber"].should.equal("1")
|
record["SequenceNumber"].should.equal("1")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_put_records_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shard_id = stream["Shards"][0]["ShardId"]
|
||||||
|
|
||||||
|
data = b"hello world"
|
||||||
|
partition_key = "1234"
|
||||||
|
|
||||||
|
client.put_record(StreamName=stream_name, Data=data, PartitionKey=partition_key)
|
||||||
|
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator)
|
||||||
|
resp["Records"].should.have.length_of(1)
|
||||||
|
record = resp["Records"][0]
|
||||||
|
|
||||||
|
record["Data"].should.equal(b"hello world")
|
||||||
|
record["PartitionKey"].should.equal("1234")
|
||||||
|
record["SequenceNumber"].should.equal("1")
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_get_records_limit():
|
def test_get_records_limit():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -193,6 +300,36 @@ def test_get_records_limit():
|
|||||||
response["Records"].should.have.length_of(2)
|
response["Records"].should.have.length_of(2)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_get_records_limit_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shard_id = stream["Shards"][0]["ShardId"]
|
||||||
|
|
||||||
|
data = b"hello world"
|
||||||
|
|
||||||
|
for index in range(5):
|
||||||
|
client.put_record(StreamName=stream_name, Data=data, PartitionKey=str(index))
|
||||||
|
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
# Retrieve only 3 records
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator, Limit=3)
|
||||||
|
resp["Records"].should.have.length_of(3)
|
||||||
|
|
||||||
|
# Then get the rest of the results
|
||||||
|
next_shard_iterator = resp["NextShardIterator"]
|
||||||
|
response = client.get_records(ShardIterator=next_shard_iterator)
|
||||||
|
response["Records"].should.have.length_of(2)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_get_records_at_sequence_number():
|
def test_get_records_at_sequence_number():
|
||||||
# AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by
|
# AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by
|
||||||
@ -227,6 +364,46 @@ def test_get_records_at_sequence_number():
|
|||||||
response["Records"][0]["Data"].should.equal("2")
|
response["Records"][0]["Data"].should.equal("2")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_get_records_at_sequence_number_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shard_id = stream["Shards"][0]["ShardId"]
|
||||||
|
|
||||||
|
for index in range(1, 5):
|
||||||
|
client.put_record(
|
||||||
|
StreamName=stream_name,
|
||||||
|
Data=f"data_{index}".encode("utf-8"),
|
||||||
|
PartitionKey=str(index),
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
# Retrieve only 2 records
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator, Limit=2)
|
||||||
|
sequence_nr = resp["Records"][1]["SequenceNumber"]
|
||||||
|
|
||||||
|
# Then get a new iterator starting at that id
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType="AT_SEQUENCE_NUMBER",
|
||||||
|
StartingSequenceNumber=sequence_nr,
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator)
|
||||||
|
# And the first result returned should be the second item
|
||||||
|
resp["Records"][0]["SequenceNumber"].should.equal(sequence_nr)
|
||||||
|
resp["Records"][0]["Data"].should.equal(b"data_2")
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_get_records_after_sequence_number():
|
def test_get_records_after_sequence_number():
|
||||||
# AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted
|
# AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted
|
||||||
@ -261,6 +438,47 @@ def test_get_records_after_sequence_number():
|
|||||||
response["MillisBehindLatest"].should.equal(0)
|
response["MillisBehindLatest"].should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_get_records_after_sequence_number_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shard_id = stream["Shards"][0]["ShardId"]
|
||||||
|
|
||||||
|
for index in range(1, 5):
|
||||||
|
client.put_record(
|
||||||
|
StreamName=stream_name,
|
||||||
|
Data=f"data_{index}".encode("utf-8"),
|
||||||
|
PartitionKey=str(index),
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
# Retrieve only 2 records
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator, Limit=2)
|
||||||
|
sequence_nr = resp["Records"][1]["SequenceNumber"]
|
||||||
|
|
||||||
|
# Then get a new iterator starting at that id
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType="AFTER_SEQUENCE_NUMBER",
|
||||||
|
StartingSequenceNumber=sequence_nr,
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator)
|
||||||
|
# And the first result returned should be the second item
|
||||||
|
resp["Records"][0]["SequenceNumber"].should.equal("3")
|
||||||
|
resp["Records"][0]["Data"].should.equal(b"data_3")
|
||||||
|
resp["MillisBehindLatest"].should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_get_records_latest():
|
def test_get_records_latest():
|
||||||
# LATEST - Start reading just after the most recent record in the shard,
|
# LATEST - Start reading just after the most recent record in the shard,
|
||||||
@ -300,6 +518,52 @@ def test_get_records_latest():
|
|||||||
response["MillisBehindLatest"].should.equal(0)
|
response["MillisBehindLatest"].should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_get_records_latest_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shard_id = stream["Shards"][0]["ShardId"]
|
||||||
|
|
||||||
|
for index in range(1, 5):
|
||||||
|
client.put_record(
|
||||||
|
StreamName=stream_name,
|
||||||
|
Data=f"data_{index}".encode("utf-8"),
|
||||||
|
PartitionKey=str(index),
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
# Retrieve only 2 records
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator, Limit=2)
|
||||||
|
sequence_nr = resp["Records"][1]["SequenceNumber"]
|
||||||
|
|
||||||
|
# Then get a new iterator starting at that id
|
||||||
|
resp = client.get_shard_iterator(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType="LATEST",
|
||||||
|
StartingSequenceNumber=sequence_nr,
|
||||||
|
)
|
||||||
|
shard_iterator = resp["ShardIterator"]
|
||||||
|
|
||||||
|
client.put_record(
|
||||||
|
StreamName=stream_name, Data=b"last_record", PartitionKey="last_record"
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.get_records(ShardIterator=shard_iterator)
|
||||||
|
# And the first result returned should be the second item
|
||||||
|
resp["Records"].should.have.length_of(1)
|
||||||
|
resp["Records"][0]["SequenceNumber"].should.equal("5")
|
||||||
|
resp["Records"][0]["PartitionKey"].should.equal("last_record")
|
||||||
|
resp["Records"][0]["Data"].should.equal(b"last_record")
|
||||||
|
resp["MillisBehindLatest"].should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
@mock_kinesis
|
||||||
def test_get_records_at_timestamp():
|
def test_get_records_at_timestamp():
|
||||||
# AT_TIMESTAMP - Read the first record at or after the specified timestamp
|
# AT_TIMESTAMP - Read the first record at or after the specified timestamp
|
||||||
@ -547,6 +811,7 @@ def test_invalid_decrease_stream_retention_period():
|
|||||||
ex.value.response["Error"]["Message"].should.equal(20)
|
ex.value.response["Error"]["Message"].should.equal(20)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_invalid_shard_iterator_type():
|
def test_invalid_shard_iterator_type():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -560,6 +825,24 @@ def test_invalid_shard_iterator_type():
|
|||||||
).should.throw(InvalidArgumentException)
|
).should.throw(InvalidArgumentException)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_invalid_shard_iterator_type_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shard_id = stream["Shards"][0]["ShardId"]
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.get_shard_iterator(
|
||||||
|
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="invalid-type"
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("InvalidArgumentException")
|
||||||
|
err["Message"].should.equal("Invalid ShardIteratorType: invalid-type")
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_add_tags():
|
def test_add_tags():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -573,6 +856,39 @@ def test_add_tags():
|
|||||||
conn.add_tags_to_stream(stream_name, {"tag2": "val4"})
|
conn.add_tags_to_stream(stream_name, {"tag2": "val4"})
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_add_list_remove_tags_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=1)
|
||||||
|
client.add_tags_to_stream(
|
||||||
|
StreamName=stream_name,
|
||||||
|
Tags={"tag1": "val1", "tag2": "val2", "tag3": "val3", "tag4": "val4",},
|
||||||
|
)
|
||||||
|
|
||||||
|
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
|
||||||
|
tags.should.have.length_of(4)
|
||||||
|
tags.should.contain({"Key": "tag1", "Value": "val1"})
|
||||||
|
tags.should.contain({"Key": "tag2", "Value": "val2"})
|
||||||
|
tags.should.contain({"Key": "tag3", "Value": "val3"})
|
||||||
|
tags.should.contain({"Key": "tag4", "Value": "val4"})
|
||||||
|
|
||||||
|
client.add_tags_to_stream(StreamName=stream_name, Tags={"tag5": "val5"})
|
||||||
|
|
||||||
|
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
|
||||||
|
tags.should.have.length_of(5)
|
||||||
|
tags.should.contain({"Key": "tag5", "Value": "val5"})
|
||||||
|
|
||||||
|
client.remove_tags_from_stream(StreamName=stream_name, TagKeys=["tag2", "tag3"])
|
||||||
|
|
||||||
|
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
|
||||||
|
tags.should.have.length_of(3)
|
||||||
|
tags.should.contain({"Key": "tag1", "Value": "val1"})
|
||||||
|
tags.should.contain({"Key": "tag4", "Value": "val4"})
|
||||||
|
tags.should.contain({"Key": "tag5", "Value": "val5"})
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_list_tags():
|
def test_list_tags():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -614,6 +930,7 @@ def test_list_tags():
|
|||||||
tags.get("tag2").should.equal("val4")
|
tags.get("tag2").should.equal("val4")
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_remove_tags():
|
def test_remove_tags():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -656,6 +973,7 @@ def test_remove_tags():
|
|||||||
tags.get("tag2").should.equal(None)
|
tags.get("tag2").should.equal(None)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_split_shard():
|
def test_split_shard():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -698,6 +1016,53 @@ def test_split_shard():
|
|||||||
shards.should.have.length_of(4)
|
shards.should.have.length_of(4)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_split_shard_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=2)
|
||||||
|
|
||||||
|
for index in range(1, 100):
|
||||||
|
client.put_record(
|
||||||
|
StreamName=stream_name,
|
||||||
|
Data=f"data_{index}".encode("utf-8"),
|
||||||
|
PartitionKey=str(index),
|
||||||
|
)
|
||||||
|
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shards = stream["Shards"]
|
||||||
|
shards.should.have.length_of(2)
|
||||||
|
|
||||||
|
shard_range = shards[0]["HashKeyRange"]
|
||||||
|
new_starting_hash = (
|
||||||
|
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
||||||
|
) // 2
|
||||||
|
client.split_shard(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardToSplit=shards[0]["ShardId"],
|
||||||
|
NewStartingHashKey=str(new_starting_hash),
|
||||||
|
)
|
||||||
|
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shards = stream["Shards"]
|
||||||
|
shards.should.have.length_of(3)
|
||||||
|
|
||||||
|
shard_range = shards[2]["HashKeyRange"]
|
||||||
|
new_starting_hash = (
|
||||||
|
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
||||||
|
) // 2
|
||||||
|
client.split_shard(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardToSplit=shards[2]["ShardId"],
|
||||||
|
NewStartingHashKey=str(new_starting_hash),
|
||||||
|
)
|
||||||
|
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shards = stream["Shards"]
|
||||||
|
shards.should.have.length_of(4)
|
||||||
|
|
||||||
|
|
||||||
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_merge_shards():
|
def test_merge_shards():
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
conn = boto.kinesis.connect_to_region("us-west-2")
|
||||||
@ -751,3 +1116,68 @@ def test_merge_shards():
|
|||||||
]
|
]
|
||||||
|
|
||||||
active_shards.should.have.length_of(2)
|
active_shards.should.have.length_of(2)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_merge_shards_boto3():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=4)
|
||||||
|
|
||||||
|
for index in range(1, 100):
|
||||||
|
client.put_record(
|
||||||
|
StreamName=stream_name,
|
||||||
|
Data=f"data_{index}".encode("utf-8"),
|
||||||
|
PartitionKey=str(index),
|
||||||
|
)
|
||||||
|
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shards = stream["Shards"]
|
||||||
|
shards.should.have.length_of(4)
|
||||||
|
|
||||||
|
client.merge_shards(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardToMerge="shardId-000000000000",
|
||||||
|
AdjacentShardToMerge="shardId-000000000001",
|
||||||
|
)
|
||||||
|
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shards = stream["Shards"]
|
||||||
|
active_shards = [
|
||||||
|
shard
|
||||||
|
for shard in shards
|
||||||
|
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
|
||||||
|
]
|
||||||
|
active_shards.should.have.length_of(3)
|
||||||
|
|
||||||
|
client.merge_shards(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardToMerge="shardId-000000000002",
|
||||||
|
AdjacentShardToMerge="shardId-000000000000",
|
||||||
|
)
|
||||||
|
|
||||||
|
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shards = stream["Shards"]
|
||||||
|
active_shards = [
|
||||||
|
shard
|
||||||
|
for shard in shards
|
||||||
|
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
|
||||||
|
]
|
||||||
|
active_shards.should.have.length_of(2)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_merge_shards_invalid_arg():
|
||||||
|
client = boto3.client("kinesis", region_name="eu-west-2")
|
||||||
|
stream_name = "my_stream_summary"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=4)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.merge_shards(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardToMerge="shardId-000000000000",
|
||||||
|
AdjacentShardToMerge="shardId-000000000002",
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("InvalidArgumentException")
|
||||||
|
err["Message"].should.equal("shardId-000000000002")
|
||||||
|
Loading…
Reference in New Issue
Block a user