Techdebt: Replace sure with regular assertions in Kinesis (#6579)

This commit is contained in:
Bert Blommers 2023-08-01 09:47:40 +00:00 committed by GitHub
parent 58a981a002
commit 6843eb4c86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 339 additions and 349 deletions

View File

@ -10,8 +10,6 @@ from dateutil.tz import tzlocal
from moto import mock_kinesis
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
import sure # noqa # pylint: disable=unused-import
@mock_kinesis
def test_stream_creation_on_demand():
@ -24,7 +22,7 @@ def test_stream_creation_on_demand():
# AWS starts with 4 shards by default
shard_list = client.list_shards(StreamARN=stream_arn)["Shards"]
shard_list.should.have.length_of(4)
assert len(shard_list) == 4
# Cannot update-shard-count when we're in on-demand mode
with pytest.raises(ClientError) as exc:
@ -32,9 +30,10 @@ def test_stream_creation_on_demand():
StreamARN=stream_arn, TargetShardCount=3, ScalingType="UNIFORM_SCALING"
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
f"Request is invalid. Stream my_stream under account {ACCOUNT_ID} is in On-Demand mode."
assert err["Code"] == "ValidationException"
assert (
err["Message"]
== f"Request is invalid. Stream my_stream under account {ACCOUNT_ID} is in On-Demand mode."
)
@ -54,7 +53,7 @@ def test_update_stream_mode():
resp = client.describe_stream_summary(StreamName="my_stream")
stream = resp["StreamDescriptionSummary"]
stream.should.have.key("StreamModeDetails").equals({"StreamMode": "PROVISIONED"})
assert stream["StreamModeDetails"] == {"StreamMode": "PROVISIONED"}
@mock_kinesis
@ -63,27 +62,25 @@ def test_describe_non_existent_stream():
with pytest.raises(ClientError) as exc:
client.describe_stream_summary(StreamName="not-a-stream")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
"Stream not-a-stream under account 123456789012 not found."
)
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == "Stream not-a-stream under account 123456789012 not found."
@mock_kinesis
def test_list_and_delete_stream():
client = boto3.client("kinesis", region_name="us-west-2")
client.list_streams()["StreamNames"].should.have.length_of(0)
assert len(client.list_streams()["StreamNames"]) == 0
client.create_stream(StreamName="stream1", ShardCount=1)
client.create_stream(StreamName="stream2", ShardCount=1)
client.list_streams()["StreamNames"].should.have.length_of(2)
assert len(client.list_streams()["StreamNames"]) == 2
client.delete_stream(StreamName="stream1")
client.list_streams()["StreamNames"].should.have.length_of(1)
assert len(client.list_streams()["StreamNames"]) == 1
stream_arn = get_stream_arn(client, "stream2")
client.delete_stream(StreamARN=stream_arn)
client.list_streams()["StreamNames"].should.have.length_of(0)
assert len(client.list_streams()["StreamNames"]) == 0
@mock_kinesis
@ -92,10 +89,8 @@ def test_delete_unknown_stream():
with pytest.raises(ClientError) as exc:
client.delete_stream(StreamName="not-a-stream")
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
"Stream not-a-stream under account 123456789012 not found."
)
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == "Stream not-a-stream under account 123456789012 not found."
@mock_kinesis
@ -108,13 +103,13 @@ def test_list_many_streams():
resp = conn.list_streams()
stream_names = resp["StreamNames"]
has_more_streams = resp["HasMoreStreams"]
stream_names.should.have.length_of(10)
has_more_streams.should.be(True)
assert len(stream_names) == 10
assert has_more_streams is True
resp2 = conn.list_streams(ExclusiveStartStreamName=stream_names[-1])
stream_names = resp2["StreamNames"]
has_more_streams = resp2["HasMoreStreams"]
stream_names.should.have.length_of(1)
has_more_streams.should.equal(False)
assert len(stream_names) == 1
assert has_more_streams is False
@mock_kinesis
@ -127,18 +122,19 @@ def test_describe_stream_summary():
resp = conn.describe_stream_summary(StreamName=stream_name)
stream = resp["StreamDescriptionSummary"]
stream["StreamName"].should.equal(stream_name)
stream["OpenShardCount"].should.equal(shard_count)
stream["StreamARN"].should.equal(
f"arn:aws:kinesis:us-west-2:{ACCOUNT_ID}:stream/{stream_name}"
assert stream["StreamName"] == stream_name
assert stream["OpenShardCount"] == shard_count
assert (
stream["StreamARN"]
== f"arn:aws:kinesis:us-west-2:{ACCOUNT_ID}:stream/{stream_name}"
)
stream["StreamStatus"].should.equal("ACTIVE")
assert stream["StreamStatus"] == "ACTIVE"
stream_arn = get_stream_arn(conn, stream_name)
resp = conn.describe_stream_summary(StreamARN=stream_arn)
stream = resp["StreamDescriptionSummary"]
stream["StreamName"].should.equal(stream_name)
assert stream["StreamName"] == stream_name
@mock_kinesis
@ -156,8 +152,8 @@ def test_basic_shard_iterator():
shard_iterator = resp["ShardIterator"]
resp = client.get_records(ShardIterator=shard_iterator)
resp.should.have.key("Records").length_of(0)
resp.should.have.key("MillisBehindLatest").equal(0)
assert len(resp["Records"]) == 0
assert resp["MillisBehindLatest"] == 0
@mock_kinesis
@ -179,8 +175,8 @@ def test_basic_shard_iterator_by_stream_arn():
resp = client.get_records(
StreamARN=stream["StreamARN"], ShardIterator=shard_iterator
)
resp.should.have.key("Records").length_of(0)
resp.should.have.key("MillisBehindLatest").equal(0)
assert len(resp["Records"]) == 0
assert resp["MillisBehindLatest"] == 0
@mock_kinesis
@ -195,11 +191,12 @@ def test_get_invalid_shard_iterator():
StreamName=stream_name, ShardId="123", ShardIteratorType="TRIM_HORIZON"
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
assert err["Code"] == "ResourceNotFoundException"
# There is some magic in AWS, that '123' is automatically converted into 'shardId-000000000123'
# AWS itself returns this normalized ID in the error message, not the given id
err["Message"].should.equal(
f"Shard 123 in stream {stream_name} under account {ACCOUNT_ID} does not exist"
assert (
err["Message"]
== f"Shard 123 in stream {stream_name} under account {ACCOUNT_ID} does not exist"
)
@ -227,12 +224,12 @@ def test_put_records():
shard_iterator = resp["ShardIterator"]
resp = client.get_records(ShardIterator=shard_iterator)
resp["Records"].should.have.length_of(5)
assert len(resp["Records"]) == 5
record = resp["Records"][0]
record["Data"].should.equal(data)
record["PartitionKey"].should.equal(partition_key)
record["SequenceNumber"].should.equal("1")
assert record["Data"] == data
assert record["PartitionKey"] == partition_key
assert record["SequenceNumber"] == "1"
@mock_kinesis
@ -257,12 +254,12 @@ def test_get_records_limit():
# Retrieve only 3 records
resp = client.get_records(ShardIterator=shard_iterator, Limit=3)
resp["Records"].should.have.length_of(3)
assert len(resp["Records"]) == 3
# Then get the rest of the results
next_shard_iterator = resp["NextShardIterator"]
response = client.get_records(ShardIterator=next_shard_iterator)
response["Records"].should.have.length_of(2)
assert len(response["Records"]) == 2
@mock_kinesis
@ -300,8 +297,8 @@ def test_get_records_at_sequence_number():
resp = client.get_records(ShardIterator=shard_iterator)
# And the first result returned should be the second item
resp["Records"][0]["SequenceNumber"].should.equal(sequence_nr)
resp["Records"][0]["Data"].should.equal(b"data_2")
assert resp["Records"][0]["SequenceNumber"] == sequence_nr
assert resp["Records"][0]["Data"] == b"data_2"
@mock_kinesis
@ -339,9 +336,9 @@ def test_get_records_after_sequence_number():
resp = client.get_records(ShardIterator=shard_iterator)
# And the first result returned should be the second item
resp["Records"][0]["SequenceNumber"].should.equal("3")
resp["Records"][0]["Data"].should.equal(b"data_3")
resp["MillisBehindLatest"].should.equal(0)
assert resp["Records"][0]["SequenceNumber"] == "3"
assert resp["Records"][0]["Data"] == b"data_3"
assert resp["MillisBehindLatest"] == 0
@mock_kinesis
@ -383,11 +380,11 @@ def test_get_records_latest():
resp = client.get_records(ShardIterator=shard_iterator)
# And the first result returned should be the second item
resp["Records"].should.have.length_of(1)
resp["Records"][0]["SequenceNumber"].should.equal("5")
resp["Records"][0]["PartitionKey"].should.equal("last_record")
resp["Records"][0]["Data"].should.equal(b"last_record")
resp["MillisBehindLatest"].should.equal(0)
assert len(resp["Records"]) == 1
assert resp["Records"][0]["SequenceNumber"] == "5"
assert resp["Records"][0]["PartitionKey"] == "last_record"
assert resp["Records"][0]["Data"] == b"last_record"
assert resp["MillisBehindLatest"] == 0
@mock_kinesis
@ -428,10 +425,10 @@ def test_get_records_at_timestamp():
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(len(keys))
assert len(response["Records"]) == len(keys)
partition_keys = [r["PartitionKey"] for r in response["Records"]]
partition_keys.should.equal(keys)
response["MillisBehindLatest"].should.equal(0)
assert partition_keys == keys
assert response["MillisBehindLatest"] == 0
@mock_kinesis
@ -457,10 +454,10 @@ def test_get_records_at_very_old_timestamp():
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(len(keys))
assert len(response["Records"]) == len(keys)
partition_keys = [r["PartitionKey"] for r in response["Records"]]
partition_keys.should.equal(keys)
response["MillisBehindLatest"].should.equal(0)
assert partition_keys == keys
assert response["MillisBehindLatest"] == 0
@mock_kinesis
@ -487,12 +484,10 @@ def test_get_records_timestamp_filtering():
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(1)
response["Records"][0]["PartitionKey"].should.equal("1")
response["Records"][0]["ApproximateArrivalTimestamp"].should.be.greater_than(
timestamp
)
response["MillisBehindLatest"].should.equal(0)
assert len(response["Records"]) == 1
assert response["Records"][0]["PartitionKey"] == "1"
assert response["Records"][0]["ApproximateArrivalTimestamp"] > timestamp
assert response["MillisBehindLatest"] == 0
@mock_kinesis
@ -513,8 +508,8 @@ def test_get_records_millis_behind_latest():
shard_iterator = response["ShardIterator"]
response = conn.get_records(ShardIterator=shard_iterator, Limit=1)
response["Records"].should.have.length_of(1)
response["MillisBehindLatest"].should.be.greater_than(0)
assert len(response["Records"]) == 1
assert response["MillisBehindLatest"] > 0
@mock_kinesis
@ -543,8 +538,8 @@ def test_get_records_at_very_new_timestamp():
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(0)
response["MillisBehindLatest"].should.equal(0)
assert len(response["Records"]) == 0
assert response["MillisBehindLatest"] == 0
@mock_kinesis
@ -568,8 +563,8 @@ def test_get_records_from_empty_stream_at_timestamp():
response = conn.get_records(ShardIterator=shard_iterator)
response["Records"].should.have.length_of(0)
response["MillisBehindLatest"].should.equal(0)
assert len(response["Records"]) == 0
assert response["MillisBehindLatest"] == 0
@mock_kinesis
@ -583,7 +578,7 @@ def test_valid_increase_stream_retention_period():
)
response = conn.describe_stream(StreamName=stream_name)
response["StreamDescription"]["RetentionPeriodHours"].should.equal(40)
assert response["StreamDescription"]["RetentionPeriodHours"] == 40
@mock_kinesis
@ -599,9 +594,10 @@ def test_invalid_increase_stream_retention_period():
conn.increase_stream_retention_period(
StreamName=stream_name, RetentionPeriodHours=25
)
ex.value.response["Error"]["Code"].should.equal("InvalidArgumentException")
ex.value.response["Error"]["Message"].should.equal(
"Requested retention period (25 hours) for stream my_stream can not be shorter than existing retention period (30 hours). Use DecreaseRetentionPeriod API."
assert ex.value.response["Error"]["Code"] == "InvalidArgumentException"
assert (
ex.value.response["Error"]["Message"]
== "Requested retention period (25 hours) for stream my_stream can not be shorter than existing retention period (30 hours). Use DecreaseRetentionPeriod API."
)
@ -616,9 +612,10 @@ def test_invalid_increase_stream_retention_too_low():
StreamName=stream_name, RetentionPeriodHours=20
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal(
"Minimum allowed retention period is 24 hours. Requested retention period (20 hours) is too short."
assert err["Code"] == "InvalidArgumentException"
assert (
err["Message"]
== "Minimum allowed retention period is 24 hours. Requested retention period (20 hours) is too short."
)
@ -633,9 +630,10 @@ def test_invalid_increase_stream_retention_too_high():
StreamName=stream_name, RetentionPeriodHours=9999
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal(
"Maximum allowed retention period is 8760 hours. Requested retention period (9999 hours) is too long."
assert err["Code"] == "InvalidArgumentException"
assert (
err["Message"]
== "Maximum allowed retention period is 8760 hours. Requested retention period (9999 hours) is too long."
)
@ -654,13 +652,13 @@ def test_valid_decrease_stream_retention_period():
)
response = conn.describe_stream(StreamName=stream_name)
response["StreamDescription"]["RetentionPeriodHours"].should.equal(25)
assert response["StreamDescription"]["RetentionPeriodHours"] == 25
conn.increase_stream_retention_period(StreamARN=stream_arn, RetentionPeriodHours=29)
conn.decrease_stream_retention_period(StreamARN=stream_arn, RetentionPeriodHours=26)
response = conn.describe_stream(StreamARN=stream_arn)
response["StreamDescription"]["RetentionPeriodHours"].should.equal(26)
assert response["StreamDescription"]["RetentionPeriodHours"] == 26
@mock_kinesis
@ -674,9 +672,10 @@ def test_decrease_stream_retention_period_upwards():
StreamName=stream_name, RetentionPeriodHours=40
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal(
"Requested retention period (40 hours) for stream decrease_stream can not be longer than existing retention period (24 hours). Use IncreaseRetentionPeriod API."
assert err["Code"] == "InvalidArgumentException"
assert (
err["Message"]
== "Requested retention period (40 hours) for stream decrease_stream can not be longer than existing retention period (24 hours). Use IncreaseRetentionPeriod API."
)
@ -691,9 +690,10 @@ def test_decrease_stream_retention_period_too_low():
StreamName=stream_name, RetentionPeriodHours=4
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal(
"Minimum allowed retention period is 24 hours. Requested retention period (4 hours) is too short."
assert err["Code"] == "InvalidArgumentException"
assert (
err["Message"]
== "Minimum allowed retention period is 24 hours. Requested retention period (4 hours) is too short."
)
@ -708,9 +708,10 @@ def test_decrease_stream_retention_period_too_high():
StreamName=stream_name, RetentionPeriodHours=9999
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal(
"Maximum allowed retention period is 8760 hours. Requested retention period (9999 hours) is too long."
assert err["Code"] == "InvalidArgumentException"
assert (
err["Message"]
== "Maximum allowed retention period is 8760 hours. Requested retention period (9999 hours) is too long."
)
@ -727,8 +728,8 @@ def test_invalid_shard_iterator_type():
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="invalid-type"
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal("Invalid ShardIteratorType: invalid-type")
assert err["Code"] == "InvalidArgumentException"
assert err["Message"] == "Invalid ShardIteratorType: invalid-type"
@mock_kinesis
@ -743,32 +744,32 @@ def test_add_list_remove_tags():
)
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
tags.should.have.length_of(4)
tags.should.contain({"Key": "tag1", "Value": "val1"})
tags.should.contain({"Key": "tag2", "Value": "val2"})
tags.should.contain({"Key": "tag3", "Value": "val3"})
tags.should.contain({"Key": "tag4", "Value": "val4"})
assert len(tags) == 4
assert {"Key": "tag1", "Value": "val1"} in tags
assert {"Key": "tag2", "Value": "val2"} in tags
assert {"Key": "tag3", "Value": "val3"} in tags
assert {"Key": "tag4", "Value": "val4"} in tags
client.add_tags_to_stream(StreamARN=stream_arn, Tags={"tag5": "val5"})
tags = client.list_tags_for_stream(StreamARN=stream_arn)["Tags"]
tags.should.have.length_of(5)
tags.should.contain({"Key": "tag5", "Value": "val5"})
assert len(tags) == 5
assert {"Key": "tag5", "Value": "val5"} in tags
client.remove_tags_from_stream(StreamName=stream_name, TagKeys=["tag2", "tag3"])
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
tags.should.have.length_of(3)
tags.should.contain({"Key": "tag1", "Value": "val1"})
tags.should.contain({"Key": "tag4", "Value": "val4"})
tags.should.contain({"Key": "tag5", "Value": "val5"})
assert len(tags) == 3
assert {"Key": "tag1", "Value": "val1"} in tags
assert {"Key": "tag4", "Value": "val4"} in tags
assert {"Key": "tag5", "Value": "val5"} in tags
client.remove_tags_from_stream(StreamARN=stream_arn, TagKeys=["tag4"])
tags = client.list_tags_for_stream(StreamName=stream_name)["Tags"]
tags.should.have.length_of(2)
tags.should.contain({"Key": "tag1", "Value": "val1"})
tags.should.contain({"Key": "tag5", "Value": "val5"})
assert len(tags) == 2
assert {"Key": "tag1", "Value": "val1"} in tags
assert {"Key": "tag5", "Value": "val5"} in tags
@mock_kinesis
@ -793,7 +794,7 @@ def test_merge_shards():
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shards = stream["Shards"]
shards.should.have.length_of(4)
assert len(shards) == 4
client.merge_shards(
StreamName=stream_name,
@ -805,7 +806,7 @@ def test_merge_shards():
shards = stream["Shards"]
# Old shards still exist, but are closed. A new shard is created out of the old one
shards.should.have.length_of(5)
assert len(shards) == 5
# Only three shards are active - the two merged shards are closed
active_shards = [
@ -813,7 +814,7 @@ def test_merge_shards():
for shard in shards
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
active_shards.should.have.length_of(3)
assert len(active_shards) == 3
client.merge_shards(
StreamARN=stream_arn,
@ -829,23 +830,21 @@ def test_merge_shards():
for shard in shards
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
active_shards.should.have.length_of(2)
assert len(active_shards) == 2
for shard in active_shards:
del shard["HashKeyRange"]
del shard["SequenceNumberRange"]
# Original shard #3 is still active (0,1,2 have been merged and closed
active_shards.should.contain({"ShardId": "shardId-000000000003"})
assert {"ShardId": "shardId-000000000003"} in active_shards
# Shard #4 was the child of #0 and #1
# Shard #5 is the child of #4 (parent) and #2 (adjacent-parent)
active_shards.should.contain(
{
"ShardId": "shardId-000000000005",
"ParentShardId": "shardId-000000000004",
"AdjacentParentShardId": "shardId-000000000002",
}
)
assert {
"ShardId": "shardId-000000000005",
"ParentShardId": "shardId-000000000004",
"AdjacentParentShardId": "shardId-000000000002",
} in active_shards
@mock_kinesis
@ -861,8 +860,8 @@ def test_merge_shards_invalid_arg():
AdjacentShardToMerge="shardId-000000000002",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal("shardId-000000000002")
assert err["Code"] == "InvalidArgumentException"
assert err["Message"] == "shardId-000000000002"
def get_stream_arn(client, stream_name):

View File

@ -6,8 +6,6 @@ from moto import mock_kinesis
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from .test_kinesis import get_stream_arn
import sure # noqa # pylint: disable=unused-import
@mock_kinesis
def test_describe_stream_limit_parameter():
@ -17,26 +15,26 @@ def test_describe_stream_limit_parameter():
client.create_stream(StreamName=stream_name, ShardCount=5)
without_filter = client.describe_stream(StreamName=stream_name)["StreamDescription"]
without_filter["Shards"].should.have.length_of(5)
without_filter["HasMoreShards"].should.equal(False)
assert len(without_filter["Shards"]) == 5
assert without_filter["HasMoreShards"] is False
with_filter = client.describe_stream(StreamName=stream_name, Limit=2)[
"StreamDescription"
]
with_filter["Shards"].should.have.length_of(2)
with_filter["HasMoreShards"].should.equal(True)
assert len(with_filter["Shards"]) == 2
assert with_filter["HasMoreShards"] is True
with_filter = client.describe_stream(StreamName=stream_name, Limit=5)[
"StreamDescription"
]
with_filter["Shards"].should.have.length_of(5)
with_filter["HasMoreShards"].should.equal(False)
assert len(with_filter["Shards"]) == 5
assert with_filter["HasMoreShards"] is False
with_filter = client.describe_stream(StreamName=stream_name, Limit=6)[
"StreamDescription"
]
with_filter["Shards"].should.have.length_of(5)
with_filter["HasMoreShards"].should.equal(False)
assert len(with_filter["Shards"]) == 5
assert with_filter["HasMoreShards"] is False
@mock_kinesis
@ -53,23 +51,24 @@ def test_list_shards():
)
shard_list = conn.list_shards(StreamName=stream_name)["Shards"]
shard_list.should.have.length_of(2)
assert len(shard_list) == 2
# Verify IDs
[s["ShardId"] for s in shard_list].should.equal(
["shardId-000000000000", "shardId-000000000001"]
)
assert [s["ShardId"] for s in shard_list] == [
"shardId-000000000000",
"shardId-000000000001",
]
# Verify hash range
for shard in shard_list:
shard.should.have.key("HashKeyRange")
shard["HashKeyRange"].should.have.key("StartingHashKey")
shard["HashKeyRange"].should.have.key("EndingHashKey")
shard_list[0]["HashKeyRange"]["EndingHashKey"].should.equal(
str(int(shard_list[1]["HashKeyRange"]["StartingHashKey"]) - 1)
assert "HashKeyRange" in shard
assert "StartingHashKey" in shard["HashKeyRange"]
assert "EndingHashKey" in shard["HashKeyRange"]
assert shard_list[0]["HashKeyRange"]["EndingHashKey"] == str(
int(shard_list[1]["HashKeyRange"]["StartingHashKey"]) - 1
)
# Verify sequence numbers
for shard in shard_list:
shard.should.have.key("SequenceNumberRange")
shard["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
assert "SequenceNumberRange" in shard
assert "StartingSequenceNumber" in shard["SequenceNumberRange"]
@mock_kinesis
@ -80,46 +79,43 @@ def test_list_shards_paging():
# Get shard 1-10
shard_list = client.list_shards(StreamName=stream_name)
shard_list["Shards"].should.have.length_of(10)
shard_list.should_not.have.key("NextToken")
assert len(shard_list["Shards"]) == 10
assert "NextToken" not in shard_list
# Get shard 1-4
resp = client.list_shards(StreamName=stream_name, MaxResults=4)
resp["Shards"].should.have.length_of(4)
[s["ShardId"] for s in resp["Shards"]].should.equal(
[
"shardId-000000000000",
"shardId-000000000001",
"shardId-000000000002",
"shardId-000000000003",
]
)
resp.should.have.key("NextToken")
assert len(resp["Shards"]) == 4
assert [s["ShardId"] for s in resp["Shards"]] == [
"shardId-000000000000",
"shardId-000000000001",
"shardId-000000000002",
"shardId-000000000003",
]
assert "NextToken" in resp
# Get shard 4-8
resp = client.list_shards(
StreamName=stream_name, MaxResults=4, NextToken=str(resp["NextToken"])
)
resp["Shards"].should.have.length_of(4)
[s["ShardId"] for s in resp["Shards"]].should.equal(
[
"shardId-000000000004",
"shardId-000000000005",
"shardId-000000000006",
"shardId-000000000007",
]
)
resp.should.have.key("NextToken")
assert len(resp["Shards"]) == 4
assert [s["ShardId"] for s in resp["Shards"]] == [
"shardId-000000000004",
"shardId-000000000005",
"shardId-000000000006",
"shardId-000000000007",
]
assert "NextToken" in resp
# Get shard 8-10
resp = client.list_shards(
StreamName=stream_name, MaxResults=4, NextToken=str(resp["NextToken"])
)
resp["Shards"].should.have.length_of(2)
[s["ShardId"] for s in resp["Shards"]].should.equal(
["shardId-000000000008", "shardId-000000000009"]
)
resp.should_not.have.key("NextToken")
assert len(resp["Shards"]) == 2
assert [s["ShardId"] for s in resp["Shards"]] == [
"shardId-000000000008",
"shardId-000000000009",
]
assert "NextToken" not in resp
@mock_kinesis
@ -129,26 +125,26 @@ def test_create_shard():
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
desc.should.have.key("StreamName").equal("my-stream")
desc.should.have.key("StreamARN").equal(
f"arn:aws:kinesis:us-west-2:{ACCOUNT_ID}:stream/my-stream"
assert desc["StreamName"] == "my-stream"
assert (
desc["StreamARN"] == f"arn:aws:kinesis:us-west-2:{ACCOUNT_ID}:stream/my-stream"
)
desc.should.have.key("Shards").length_of(2)
desc.should.have.key("StreamStatus").equals("ACTIVE")
desc.should.have.key("HasMoreShards").equals(False)
desc.should.have.key("RetentionPeriodHours").equals(24)
desc.should.have.key("StreamCreationTimestamp")
desc.should.have.key("EnhancedMonitoring").should.equal([{"ShardLevelMetrics": []}])
desc.should.have.key("EncryptionType").should.equal("NONE")
assert len(desc["Shards"]) == 2
assert desc["StreamStatus"] == "ACTIVE"
assert desc["HasMoreShards"] is False
assert desc["RetentionPeriodHours"] == 24
assert "StreamCreationTimestamp" in desc
assert desc["EnhancedMonitoring"] == [{"ShardLevelMetrics": []}]
assert desc["EncryptionType"] == "NONE"
shards = desc["Shards"]
shards[0].should.have.key("ShardId").equal("shardId-000000000000")
shards[0].should.have.key("HashKeyRange")
shards[0]["HashKeyRange"].should.have.key("StartingHashKey").equals("0")
shards[0]["HashKeyRange"].should.have.key("EndingHashKey")
shards[0].should.have.key("SequenceNumberRange")
shards[0]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
shards[0]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber")
assert shards[0]["ShardId"] == "shardId-000000000000"
assert "HashKeyRange" in shards[0]
assert shards[0]["HashKeyRange"]["StartingHashKey"] == "0"
assert "EndingHashKey" in shards[0]["HashKeyRange"]
assert "SequenceNumberRange" in shards[0]
assert "StartingSequenceNumber" in shards[0]["SequenceNumberRange"]
assert "EndingSequenceNumber" not in shards[0]["SequenceNumberRange"]
@mock_kinesis
@ -163,9 +159,10 @@ def test_split_shard_with_invalid_name():
NewStartingHashKey="170141183460469231731687303715884105728",
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value '?' at 'shardToSplit' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+"
assert err["Code"] == "ValidationException"
assert (
err["Message"]
== "1 validation error detected: Value '?' at 'shardToSplit' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+"
)
@ -181,9 +178,10 @@ def test_split_shard_with_unknown_name():
NewStartingHashKey="170141183460469231731687303715884105728",
)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
"Could not find shard unknown in stream my-stream under account 123456789012."
assert err["Code"] == "ResourceNotFoundException"
assert (
err["Message"]
== "Could not find shard unknown in stream my-stream under account 123456789012."
)
@ -199,9 +197,10 @@ def test_split_shard_invalid_hashkey():
NewStartingHashKey="sth",
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value 'sth' at 'newStartingHashKey' failed to satisfy constraint: Member must satisfy regular expression pattern: 0|([1-9]\\d{0,38})"
assert err["Code"] == "ValidationException"
assert (
err["Message"]
== "1 validation error detected: Value 'sth' at 'newStartingHashKey' failed to satisfy constraint: Member must satisfy regular expression pattern: 0|([1-9]\\d{0,38})"
)
@ -217,9 +216,10 @@ def test_split_shard_hashkey_out_of_bounds():
NewStartingHashKey="170141183460469231731687303715884000000",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal(
f"NewStartingHashKey 170141183460469231731687303715884000000 used in SplitShard() on shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} is not both greater than one plus the shard's StartingHashKey 170141183460469231731687303715884105728 and less than the shard's EndingHashKey 340282366920938463463374607431768211455."
assert err["Code"] == "InvalidArgumentException"
assert (
err["Message"]
== f"NewStartingHashKey 170141183460469231731687303715884000000 used in SplitShard() on shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} is not both greater than one plus the shard's StartingHashKey 170141183460469231731687303715884105728 and less than the shard's EndingHashKey 340282366920938463463374607431768211455."
)
@ -248,32 +248,34 @@ def test_split_shard():
resp = client.describe_stream(StreamName=stream_name)["StreamDescription"]
shards = resp["Shards"]
shards.should.have.length_of(4)
shards[0].should.have.key("ShardId").equals("shardId-000000000000")
shards[0].should.have.key("HashKeyRange")
shards[0].shouldnt.have.key("ParentShardId")
assert len(shards) == 4
assert shards[0]["ShardId"] == "shardId-000000000000"
assert "HashKeyRange" in shards[0]
assert "ParentShardId" not in shards[0]
shards[1].should.have.key("ShardId").equals("shardId-000000000001")
shards[1].shouldnt.have.key("ParentShardId")
shards[1].should.have.key("HashKeyRange")
shards[1]["HashKeyRange"].should.have.key("StartingHashKey").equals(
original_shards[1]["HashKeyRange"]["StartingHashKey"]
assert shards[1]["ShardId"] == "shardId-000000000001"
assert "ParentShardId" not in shards[1]
assert "HashKeyRange" in shards[1]
assert (
shards[1]["HashKeyRange"]["StartingHashKey"]
== original_shards[1]["HashKeyRange"]["StartingHashKey"]
)
shards[1]["HashKeyRange"].should.have.key("EndingHashKey").equals(
original_shards[1]["HashKeyRange"]["EndingHashKey"]
assert (
shards[1]["HashKeyRange"]["EndingHashKey"]
== original_shards[1]["HashKeyRange"]["EndingHashKey"]
)
shards[1]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
shards[1]["SequenceNumberRange"].should.have.key("EndingSequenceNumber")
assert "StartingSequenceNumber" in shards[1]["SequenceNumberRange"]
assert "EndingSequenceNumber" in shards[1]["SequenceNumberRange"]
shards[2].should.have.key("ShardId").equals("shardId-000000000002")
shards[2].should.have.key("ParentShardId").equals(shards[1]["ShardId"])
shards[2]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
shards[2]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber")
assert shards[2]["ShardId"] == "shardId-000000000002"
assert shards[2]["ParentShardId"] == shards[1]["ShardId"]
assert "StartingSequenceNumber" in shards[2]["SequenceNumberRange"]
assert "EndingSequenceNumber" not in shards[2]["SequenceNumberRange"]
shards[3].should.have.key("ShardId").equals("shardId-000000000003")
shards[3].should.have.key("ParentShardId").equals(shards[1]["ShardId"])
shards[3]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
shards[3]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber")
assert shards[3]["ShardId"] == "shardId-000000000003"
assert shards[3]["ParentShardId"] == shards[1]["ShardId"]
assert "StartingSequenceNumber" in shards[3]["SequenceNumberRange"]
assert "EndingSequenceNumber" not in shards[3]["SequenceNumberRange"]
@mock_kinesis
@ -295,9 +297,10 @@ def test_split_shard_that_was_split_before():
NewStartingHashKey="170141183460469231731687303715884105829",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal(
f"Shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} has already been merged or split, and thus is not eligible for merging or splitting."
assert err["Code"] == "InvalidArgumentException"
assert (
err["Message"]
== f"Shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} has already been merged or split, and thus is not eligible for merging or splitting."
)
@ -322,22 +325,22 @@ def test_update_shard_count(initial, target, expected_total):
StreamName="my-stream", TargetShardCount=target, ScalingType="UNIFORM_SCALING"
)
resp.should.have.key("StreamName").equals("my-stream")
resp.should.have.key("CurrentShardCount").equals(initial)
resp.should.have.key("TargetShardCount").equals(target)
assert resp["StreamName"] == "my-stream"
assert resp["CurrentShardCount"] == initial
assert resp["TargetShardCount"] == target
stream = client.describe_stream(StreamName="my-stream")["StreamDescription"]
stream["StreamStatus"].should.equal("ACTIVE")
stream["Shards"].should.have.length_of(expected_total)
assert stream["StreamStatus"] == "ACTIVE"
assert len(stream["Shards"]) == expected_total
active_shards = [
shard
for shard in stream["Shards"]
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
active_shards.should.have.length_of(target)
assert len(active_shards) == target
resp = client.describe_stream_summary(StreamName="my-stream")
stream = resp["StreamDescriptionSummary"]
stream["OpenShardCount"].should.equal(target)
assert stream["OpenShardCount"] == target

View File

@ -1,5 +1,4 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_kinesis, mock_cloudformation
@ -17,8 +16,8 @@ def test_kinesis_cloudformation_create_stream():
provisioned_resource = cf_conn.list_stack_resources(StackName=stack_name)[
"StackResourceSummaries"
][0]
provisioned_resource["LogicalResourceId"].should.equal("MyStream")
len(provisioned_resource["PhysicalResourceId"]).should.be.greater_than(0)
assert provisioned_resource["LogicalResourceId"] == "MyStream"
assert len(provisioned_resource["PhysicalResourceId"]) > 0
@mock_cloudformation
@ -56,7 +55,7 @@ Outputs:
stream_description = kinesis_conn.describe_stream(StreamName=output_stream_name)[
"StreamDescription"
]
output_stream_arn.should.equal(stream_description["StreamARN"])
assert output_stream_arn == stream_description["StreamARN"]
@mock_cloudformation
@ -83,19 +82,19 @@ Resources:
cf_conn.create_stack(StackName=stack_name, TemplateBody=template)
stack_description = cf_conn.describe_stacks(StackName=stack_name)["Stacks"][0]
stack_description["StackName"].should.equal(stack_name)
assert stack_description["StackName"] == stack_name
kinesis_conn = boto3.client("kinesis", region_name="us-east-1")
stream_description = kinesis_conn.describe_stream(StreamName="MyStream")[
"StreamDescription"
]
stream_description["RetentionPeriodHours"].should.equal(48)
assert stream_description["RetentionPeriodHours"] == 48
tags = kinesis_conn.list_tags_for_stream(StreamName="MyStream")["Tags"]
tag1_value = [tag for tag in tags if tag["Key"] == "TagKey1"][0]["Value"]
tag2_value = [tag for tag in tags if tag["Key"] == "TagKey2"][0]["Value"]
tag1_value.should.equal("TagValue1")
tag2_value.should.equal("TagValue2")
assert tag1_value == "TagValue1"
assert tag2_value == "TagValue2"
shards_provisioned = len(
[
@ -104,7 +103,7 @@ Resources:
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
)
shards_provisioned.should.equal(4)
assert shards_provisioned == 4
template = """
Resources:
@ -125,13 +124,13 @@ Resources:
stream_description = kinesis_conn.describe_stream(StreamName="MyStream")[
"StreamDescription"
]
stream_description["RetentionPeriodHours"].should.equal(24)
assert stream_description["RetentionPeriodHours"] == 24
tags = kinesis_conn.list_tags_for_stream(StreamName="MyStream")["Tags"]
tag1_value = [tag for tag in tags if tag["Key"] == "TagKey1"][0]["Value"]
tag2_value = [tag for tag in tags if tag["Key"] == "TagKey2"][0]["Value"]
tag1_value.should.equal("TagValue1a")
tag2_value.should.equal("TagValue2a")
assert tag1_value == "TagValue1a"
assert tag2_value == "TagValue2a"
shards_provisioned = len(
[
@ -140,7 +139,7 @@ Resources:
if "EndingSequenceNumber" not in shard["SequenceNumberRange"]
]
)
shards_provisioned.should.equal(6)
assert shards_provisioned == 6
@mock_cloudformation
@ -160,14 +159,14 @@ Resources:
cf_conn.create_stack(StackName=stack_name, TemplateBody=template)
stack_description = cf_conn.describe_stacks(StackName=stack_name)["Stacks"][0]
stack_description["StackName"].should.equal(stack_name)
assert stack_description["StackName"] == stack_name
kinesis_conn = boto3.client("kinesis", region_name="us-east-1")
stream_description = kinesis_conn.describe_stream(StreamName="MyStream")[
"StreamDescription"
]
stream_description["StreamName"].should.equal("MyStream")
assert stream_description["StreamName"] == "MyStream"
cf_conn.delete_stack(StackName=stack_name)
streams = kinesis_conn.list_streams()["StreamNames"]
len(streams).should.equal(0)
assert len(streams) == 0

View File

@ -11,8 +11,8 @@ def test_enable_encryption():
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
desc.should.have.key("EncryptionType").should.equal("NONE")
desc.shouldnt.have.key("KeyId")
assert desc["EncryptionType"] == "NONE"
assert "KeyId" not in desc
client.start_stream_encryption(
StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"
@ -20,8 +20,8 @@ def test_enable_encryption():
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
desc.should.have.key("EncryptionType").should.equal("KMS")
desc.should.have.key("KeyId").equals("n/a")
assert desc["EncryptionType"] == "KMS"
assert desc["KeyId"] == "n/a"
@mock_kinesis
@ -31,7 +31,7 @@ def test_disable_encryption():
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
desc.should.have.key("EncryptionType").should.equal("NONE")
assert desc["EncryptionType"] == "NONE"
client.start_stream_encryption(
StreamName="my-stream", EncryptionType="KMS", KeyId="n/a"
@ -43,8 +43,8 @@ def test_disable_encryption():
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
desc.should.have.key("EncryptionType").should.equal("NONE")
desc.shouldnt.have.key("KeyId")
assert desc["EncryptionType"] == "NONE"
assert "KeyId" not in desc
@mock_kinesis
@ -55,7 +55,7 @@ def test_disable_encryption__using_arns():
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
desc.should.have.key("EncryptionType").should.equal("NONE")
assert desc["EncryptionType"] == "NONE"
client.start_stream_encryption(
StreamARN=stream_arn, EncryptionType="KMS", KeyId="n/a"
@ -67,5 +67,5 @@ def test_disable_encryption__using_arns():
resp = client.describe_stream(StreamName="my-stream")
desc = resp["StreamDescription"]
desc.should.have.key("EncryptionType").should.equal("NONE")
desc.shouldnt.have.key("KeyId")
assert desc["EncryptionType"] == "NONE"
assert "KeyId" not in desc

View File

@ -15,11 +15,12 @@ def test_enable_enhanced_monitoring_all():
StreamName=stream_name, ShardLevelMetrics=["ALL"]
)
resp.should.have.key("StreamName").equals(stream_name)
resp.should.have.key("CurrentShardLevelMetrics").equals([])
resp.should.have.key("DesiredShardLevelMetrics").equals(["ALL"])
resp.should.have.key("StreamARN").equals(
f"arn:aws:kinesis:us-east-1:{DEFAULT_ACCOUNT_ID}:stream/{stream_name}"
assert resp["StreamName"] == stream_name
assert resp["CurrentShardLevelMetrics"] == []
assert resp["DesiredShardLevelMetrics"] == ["ALL"]
assert (
resp["StreamARN"]
== f"arn:aws:kinesis:us-east-1:{DEFAULT_ACCOUNT_ID}:stream/{stream_name}"
)
@ -35,7 +36,7 @@ def test_enable_enhanced_monitoring_is_persisted():
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
metrics = stream["EnhancedMonitoring"][0]["ShardLevelMetrics"]
set(metrics).should.equal({"IncomingBytes", "OutgoingBytes"})
assert set(metrics) == {"IncomingBytes", "OutgoingBytes"}
@mock_kinesis
@ -52,22 +53,20 @@ def test_enable_enhanced_monitoring_in_steps():
StreamName=stream_name, ShardLevelMetrics=["WriteProvisionedThroughputExceeded"]
)
resp.should.have.key("CurrentShardLevelMetrics").should.have.length_of(2)
resp["CurrentShardLevelMetrics"].should.contain("IncomingBytes")
resp["CurrentShardLevelMetrics"].should.contain("OutgoingBytes")
resp.should.have.key("DesiredShardLevelMetrics").should.have.length_of(3)
resp["DesiredShardLevelMetrics"].should.contain("IncomingBytes")
resp["DesiredShardLevelMetrics"].should.contain("OutgoingBytes")
resp["DesiredShardLevelMetrics"].should.contain(
"WriteProvisionedThroughputExceeded"
)
assert len(resp["CurrentShardLevelMetrics"]) == 2
assert "IncomingBytes" in resp["CurrentShardLevelMetrics"]
assert "OutgoingBytes" in resp["CurrentShardLevelMetrics"]
assert len(resp["DesiredShardLevelMetrics"]) == 3
assert "IncomingBytes" in resp["DesiredShardLevelMetrics"]
assert "OutgoingBytes" in resp["DesiredShardLevelMetrics"]
assert "WriteProvisionedThroughputExceeded" in resp["DesiredShardLevelMetrics"]
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
metrics = stream["EnhancedMonitoring"][0]["ShardLevelMetrics"]
metrics.should.have.length_of(3)
metrics.should.contain("IncomingBytes")
metrics.should.contain("OutgoingBytes")
metrics.should.contain("WriteProvisionedThroughputExceeded")
assert len(metrics) == 3
assert "IncomingBytes" in metrics
assert "OutgoingBytes" in metrics
assert "WriteProvisionedThroughputExceeded" in metrics
@mock_kinesis
@ -90,35 +89,32 @@ def test_disable_enhanced_monitoring():
StreamName=stream_name, ShardLevelMetrics=["OutgoingBytes"]
)
resp.should.have.key("StreamName").equals(stream_name)
resp.should.have.key("StreamARN").equals(
f"arn:aws:kinesis:us-east-1:{DEFAULT_ACCOUNT_ID}:stream/{stream_name}"
assert resp["StreamName"] == stream_name
assert (
resp["StreamARN"]
== f"arn:aws:kinesis:us-east-1:{DEFAULT_ACCOUNT_ID}:stream/{stream_name}"
)
resp.should.have.key("CurrentShardLevelMetrics").should.have.length_of(3)
resp["CurrentShardLevelMetrics"].should.contain("IncomingBytes")
resp["CurrentShardLevelMetrics"].should.contain("OutgoingBytes")
resp["CurrentShardLevelMetrics"].should.contain(
"WriteProvisionedThroughputExceeded"
)
resp.should.have.key("DesiredShardLevelMetrics").should.have.length_of(2)
resp["DesiredShardLevelMetrics"].should.contain("IncomingBytes")
resp["DesiredShardLevelMetrics"].should.contain(
"WriteProvisionedThroughputExceeded"
)
assert len(resp["CurrentShardLevelMetrics"]) == 3
assert "IncomingBytes" in resp["CurrentShardLevelMetrics"]
assert "OutgoingBytes" in resp["CurrentShardLevelMetrics"]
assert "WriteProvisionedThroughputExceeded" in resp["CurrentShardLevelMetrics"]
assert len(resp["DesiredShardLevelMetrics"]) == 2
assert "IncomingBytes" in resp["DesiredShardLevelMetrics"]
assert "WriteProvisionedThroughputExceeded" in resp["DesiredShardLevelMetrics"]
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
metrics = stream["EnhancedMonitoring"][0]["ShardLevelMetrics"]
metrics.should.have.length_of(2)
metrics.should.contain("IncomingBytes")
metrics.should.contain("WriteProvisionedThroughputExceeded")
assert len(metrics) == 2
assert "IncomingBytes" in metrics
assert "WriteProvisionedThroughputExceeded" in metrics
resp = client.disable_enhanced_monitoring(
StreamARN=stream_arn, ShardLevelMetrics=["IncomingBytes"]
)
resp.should.have.key("CurrentShardLevelMetrics").should.have.length_of(2)
resp.should.have.key("DesiredShardLevelMetrics").should.have.length_of(1)
assert len(resp["CurrentShardLevelMetrics"]) == 2
assert len(resp["DesiredShardLevelMetrics"]) == 1
@mock_kinesis
@ -142,4 +138,4 @@ def test_disable_enhanced_monitoring_all():
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
metrics = stream["EnhancedMonitoring"][0]["ShardLevelMetrics"]
metrics.should.equal([])
assert metrics == []

View File

@ -20,7 +20,7 @@ def test_list_stream_consumers():
resp = client.list_stream_consumers(StreamARN=stream_arn)
resp.should.have.key("Consumers").equals([])
assert resp["Consumers"] == []
@mock_kinesis
@ -31,27 +31,29 @@ def test_register_stream_consumer():
resp = client.register_stream_consumer(
StreamARN=stream_arn, ConsumerName="newconsumer"
)
resp.should.have.key("Consumer")
assert "Consumer" in resp
consumer = resp["Consumer"]
consumer.should.have.key("ConsumerName").equals("newconsumer")
consumer.should.have.key("ConsumerARN").equals(
f"arn:aws:kinesis:eu-west-1:{ACCOUNT_ID}:stream/my-stream/consumer/newconsumer"
assert consumer["ConsumerName"] == "newconsumer"
assert (
consumer["ConsumerARN"]
== f"arn:aws:kinesis:eu-west-1:{ACCOUNT_ID}:stream/my-stream/consumer/newconsumer"
)
consumer.should.have.key("ConsumerStatus").equals("ACTIVE")
consumer.should.have.key("ConsumerCreationTimestamp")
assert consumer["ConsumerStatus"] == "ACTIVE"
assert "ConsumerCreationTimestamp" in consumer
resp = client.list_stream_consumers(StreamARN=stream_arn)
resp.should.have.key("Consumers").length_of(1)
assert len(resp["Consumers"]) == 1
consumer = resp["Consumers"][0]
consumer.should.have.key("ConsumerName").equals("newconsumer")
consumer.should.have.key("ConsumerARN").equals(
f"arn:aws:kinesis:eu-west-1:{ACCOUNT_ID}:stream/my-stream/consumer/newconsumer"
assert consumer["ConsumerName"] == "newconsumer"
assert (
consumer["ConsumerARN"]
== f"arn:aws:kinesis:eu-west-1:{ACCOUNT_ID}:stream/my-stream/consumer/newconsumer"
)
consumer.should.have.key("ConsumerStatus").equals("ACTIVE")
consumer.should.have.key("ConsumerCreationTimestamp")
assert consumer["ConsumerStatus"] == "ACTIVE"
assert "ConsumerCreationTimestamp" in consumer
@mock_kinesis
@ -63,14 +65,14 @@ def test_describe_stream_consumer_by_name():
resp = client.describe_stream_consumer(
StreamARN=stream_arn, ConsumerName="newconsumer"
)
resp.should.have.key("ConsumerDescription")
assert "ConsumerDescription" in resp
consumer = resp["ConsumerDescription"]
consumer.should.have.key("ConsumerName").equals("newconsumer")
consumer.should.have.key("ConsumerARN")
consumer.should.have.key("ConsumerStatus").equals("ACTIVE")
consumer.should.have.key("ConsumerCreationTimestamp")
consumer.should.have.key("StreamARN").equals(stream_arn)
assert consumer["ConsumerName"] == "newconsumer"
assert "ConsumerARN" in consumer
assert consumer["ConsumerStatus"] == "ACTIVE"
assert "ConsumerCreationTimestamp" in consumer
assert consumer["StreamARN"] == stream_arn
@mock_kinesis
@ -83,14 +85,14 @@ def test_describe_stream_consumer_by_arn():
consumer_arn = resp["Consumer"]["ConsumerARN"]
resp = client.describe_stream_consumer(ConsumerARN=consumer_arn)
resp.should.have.key("ConsumerDescription")
assert "ConsumerDescription" in resp
consumer = resp["ConsumerDescription"]
consumer.should.have.key("ConsumerName").equals("newconsumer")
consumer.should.have.key("ConsumerARN")
consumer.should.have.key("ConsumerStatus").equals("ACTIVE")
consumer.should.have.key("ConsumerCreationTimestamp")
consumer.should.have.key("StreamARN").equals(stream_arn)
assert consumer["ConsumerName"] == "newconsumer"
assert "ConsumerARN" in consumer
assert consumer["ConsumerStatus"] == "ACTIVE"
assert "ConsumerCreationTimestamp" in consumer
assert consumer["StreamARN"] == stream_arn
@mock_kinesis
@ -102,10 +104,8 @@ def test_describe_stream_consumer_unknown():
with pytest.raises(ClientError) as exc:
client.describe_stream_consumer(ConsumerARN=unknown_arn)
err = exc.value.response["Error"]
err["Code"].should.equal("ResourceNotFoundException")
err["Message"].should.equal(
f"Consumer {unknown_arn}, account {ACCOUNT_ID} not found."
)
assert err["Code"] == "ResourceNotFoundException"
assert err["Message"] == f"Consumer {unknown_arn}, account {ACCOUNT_ID} not found."
@mock_kinesis
@ -116,15 +116,11 @@ def test_deregister_stream_consumer_by_name():
client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="consumer1")
client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="consumer2")
client.list_stream_consumers(StreamARN=stream_arn)[
"Consumers"
].should.have.length_of(2)
assert len(client.list_stream_consumers(StreamARN=stream_arn)["Consumers"]) == 2
client.deregister_stream_consumer(StreamARN=stream_arn, ConsumerName="consumer1")
client.list_stream_consumers(StreamARN=stream_arn)[
"Consumers"
].should.have.length_of(1)
assert len(client.list_stream_consumers(StreamARN=stream_arn)["Consumers"]) == 1
@mock_kinesis
@ -138,12 +134,8 @@ def test_deregister_stream_consumer_by_arn():
consumer1_arn = resp["Consumer"]["ConsumerARN"]
client.register_stream_consumer(StreamARN=stream_arn, ConsumerName="consumer2")
client.list_stream_consumers(StreamARN=stream_arn)[
"Consumers"
].should.have.length_of(2)
assert len(client.list_stream_consumers(StreamARN=stream_arn)["Consumers"]) == 2
client.deregister_stream_consumer(ConsumerARN=consumer1_arn)
client.list_stream_consumers(StreamARN=stream_arn)[
"Consumers"
].should.have.length_of(1)
assert len(client.list_stream_consumers(StreamARN=stream_arn)["Consumers"]) == 1

View File

@ -1,6 +1,5 @@
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_kinesis
@ -19,9 +18,10 @@ def test_record_data_exceeds_1mb():
StreamName="my_stream",
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576"
assert err["Code"] == "ValidationException"
assert (
err["Message"]
== "1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576"
)
@ -39,9 +39,10 @@ def test_record_data_and_partition_key_exceeds_1mb():
StreamName="my_stream",
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576"
assert err["Code"] == "ValidationException"
assert (
err["Message"]
== "1 validation error detected: Value at 'records.1.member.data' failed to satisfy constraint: Member must have length less than or equal to 1048576"
)
@ -84,8 +85,8 @@ def test_total_record_data_exceeds_5mb():
StreamName="my_stream",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidArgumentException")
err["Message"].should.equal("Records size exceeds 5 MB limit")
assert err["Code"] == "InvalidArgumentException"
assert err["Message"] == "Records size exceeds 5 MB limit"
@mock_kinesis
@ -112,7 +113,8 @@ def test_too_many_records():
StreamName="my_stream",
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value at 'records' failed to satisfy constraint: Member must have length less than or equal to 500"
assert err["Code"] == "ValidationException"
assert (
err["Message"]
== "1 validation error detected: Value at 'records' failed to satisfy constraint: Member must have length less than or equal to 500"
)

View File

@ -1,5 +1,4 @@
import json
import sure # noqa # pylint: disable=unused-import
import moto.server as server
from moto import mock_kinesis
@ -13,4 +12,4 @@ def test_list_streams():
res = test_client.get("/?Action=ListStreams")
json_data = json.loads(res.data.decode("utf-8"))
json_data.should.equal({"HasMoreStreams": False, "StreamNames": []})
assert json_data == {"HasMoreStreams": False, "StreamNames": []}