Kinesis - Fix split_shards behaviour (#4540)
This commit is contained in:
parent
b9e38ecc76
commit
d7dd8fb4c5
@ -27,9 +27,9 @@ class StreamNotFoundError(ResourceNotFoundError):
|
|||||||
|
|
||||||
|
|
||||||
class ShardNotFoundError(ResourceNotFoundError):
|
class ShardNotFoundError(ResourceNotFoundError):
|
||||||
def __init__(self, shard_id):
|
def __init__(self, shard_id, stream):
|
||||||
super(ShardNotFoundError, self).__init__(
|
super(ShardNotFoundError, self).__init__(
|
||||||
"Shard {0} under account {1} not found.".format(shard_id, ACCOUNT_ID)
|
f"Could not find shard {shard_id} in stream {stream} under account {ACCOUNT_ID}."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -39,3 +39,14 @@ class InvalidArgumentError(BadRequest):
|
|||||||
self.description = json.dumps(
|
self.description = json.dumps(
|
||||||
{"message": message, "__type": "InvalidArgumentException"}
|
{"message": message, "__type": "InvalidArgumentException"}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ValidationException(BadRequest):
|
||||||
|
def __init__(self, value, position, regex_to_match):
|
||||||
|
super(ValidationException, self).__init__()
|
||||||
|
self.description = json.dumps(
|
||||||
|
{
|
||||||
|
"message": f"1 validation error detected: Value '{value}' at '{position}' failed to satisfy constraint: Member must satisfy regular expression pattern: {regex_to_match}",
|
||||||
|
"__type": "ValidationException",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
@ -18,6 +18,7 @@ from .exceptions import (
|
|||||||
ResourceInUseError,
|
ResourceInUseError,
|
||||||
ResourceNotFoundError,
|
ResourceNotFoundError,
|
||||||
InvalidArgumentError,
|
InvalidArgumentError,
|
||||||
|
ValidationException,
|
||||||
)
|
)
|
||||||
from .utils import (
|
from .utils import (
|
||||||
compose_shard_iterator,
|
compose_shard_iterator,
|
||||||
@ -46,12 +47,13 @@ class Record(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class Shard(BaseModel):
|
class Shard(BaseModel):
|
||||||
def __init__(self, shard_id, starting_hash, ending_hash):
|
def __init__(self, shard_id, starting_hash, ending_hash, parent=None):
|
||||||
self._shard_id = shard_id
|
self._shard_id = shard_id
|
||||||
self.starting_hash = starting_hash
|
self.starting_hash = starting_hash
|
||||||
self.ending_hash = ending_hash
|
self.ending_hash = ending_hash
|
||||||
self.records = OrderedDict()
|
self.records = OrderedDict()
|
||||||
self.is_open = True
|
self.is_open = True
|
||||||
|
self.parent = parent
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def shard_id(self):
|
def shard_id(self):
|
||||||
@ -125,6 +127,8 @@ class Shard(BaseModel):
|
|||||||
},
|
},
|
||||||
"ShardId": self.shard_id,
|
"ShardId": self.shard_id,
|
||||||
}
|
}
|
||||||
|
if self.parent:
|
||||||
|
response["ParentShardId"] = self.parent
|
||||||
if not self.is_open:
|
if not self.is_open:
|
||||||
response["SequenceNumberRange"][
|
response["SequenceNumberRange"][
|
||||||
"EndingSequenceNumber"
|
"EndingSequenceNumber"
|
||||||
@ -146,6 +150,8 @@ class Stream(CloudFormationModel):
|
|||||||
self.retention_period_hours = (
|
self.retention_period_hours = (
|
||||||
retention_period_hours if retention_period_hours else 24
|
retention_period_hours if retention_period_hours else 24
|
||||||
)
|
)
|
||||||
|
self.enhanced_monitoring = [{"ShardLevelMetrics": []}]
|
||||||
|
self.encryption_type = "NONE"
|
||||||
|
|
||||||
def update_shard_count(self, shard_count):
|
def update_shard_count(self, shard_count):
|
||||||
# ToDo: This was extracted from init. It's only accurate for new streams.
|
# ToDo: This was extracted from init. It's only accurate for new streams.
|
||||||
@ -165,7 +171,7 @@ class Stream(CloudFormationModel):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def arn(self):
|
def arn(self):
|
||||||
return "arn:aws:kinesis:{region}:{account_number}:{stream_name}".format(
|
return "arn:aws:kinesis:{region}:{account_number}:stream/{stream_name}".format(
|
||||||
region=self.region,
|
region=self.region,
|
||||||
account_number=self.account_number,
|
account_number=self.account_number,
|
||||||
stream_name=self.stream_name,
|
stream_name=self.stream_name,
|
||||||
@ -175,7 +181,7 @@ class Stream(CloudFormationModel):
|
|||||||
if shard_id in self.shards:
|
if shard_id in self.shards:
|
||||||
return self.shards[shard_id]
|
return self.shards[shard_id]
|
||||||
else:
|
else:
|
||||||
raise ShardNotFoundError(shard_id)
|
raise ShardNotFoundError(shard_id, stream="")
|
||||||
|
|
||||||
def get_shard_for_key(self, partition_key, explicit_hash_key):
|
def get_shard_for_key(self, partition_key, explicit_hash_key):
|
||||||
if not isinstance(partition_key, str):
|
if not isinstance(partition_key, str):
|
||||||
@ -214,9 +220,12 @@ class Stream(CloudFormationModel):
|
|||||||
"StreamDescription": {
|
"StreamDescription": {
|
||||||
"StreamARN": self.arn,
|
"StreamARN": self.arn,
|
||||||
"StreamName": self.stream_name,
|
"StreamName": self.stream_name,
|
||||||
|
"StreamCreationTimestamp": str(self.creation_datetime),
|
||||||
"StreamStatus": self.status,
|
"StreamStatus": self.status,
|
||||||
"HasMoreShards": len(requested_shards) != len(all_shards),
|
"HasMoreShards": len(requested_shards) != len(all_shards),
|
||||||
"RetentionPeriodHours": self.retention_period_hours,
|
"RetentionPeriodHours": self.retention_period_hours,
|
||||||
|
"EnhancedMonitoring": self.enhanced_monitoring,
|
||||||
|
"EncryptionType": self.encryption_type,
|
||||||
"Shards": [shard.to_json() for shard in requested_shards],
|
"Shards": [shard.to_json() for shard in requested_shards],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -374,7 +383,12 @@ class KinesisBackend(BaseBackend):
|
|||||||
):
|
):
|
||||||
# Validate params
|
# Validate params
|
||||||
stream = self.describe_stream(stream_name)
|
stream = self.describe_stream(stream_name)
|
||||||
shard = stream.get_shard(shard_id)
|
try:
|
||||||
|
shard = stream.get_shard(shard_id)
|
||||||
|
except ShardNotFoundError:
|
||||||
|
raise ResourceNotFoundError(
|
||||||
|
message=f"Shard {shard_id} in stream {stream_name} under account {ACCOUNT_ID} does not exist"
|
||||||
|
)
|
||||||
|
|
||||||
shard_iterator = compose_new_shard_iterator(
|
shard_iterator = compose_new_shard_iterator(
|
||||||
stream_name,
|
stream_name,
|
||||||
@ -440,25 +454,58 @@ class KinesisBackend(BaseBackend):
|
|||||||
def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
|
def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
|
||||||
stream = self.describe_stream(stream_name)
|
stream = self.describe_stream(stream_name)
|
||||||
|
|
||||||
|
if not re.match("[a-zA-Z0-9_.-]+", shard_to_split):
|
||||||
|
raise ValidationException(
|
||||||
|
value=shard_to_split,
|
||||||
|
position="shardToSplit",
|
||||||
|
regex_to_match="[a-zA-Z0-9_.-]+",
|
||||||
|
)
|
||||||
|
|
||||||
if shard_to_split not in stream.shards:
|
if shard_to_split not in stream.shards:
|
||||||
raise ResourceNotFoundError(shard_to_split)
|
raise ShardNotFoundError(shard_id=shard_to_split, stream=stream_name)
|
||||||
|
|
||||||
if not re.match(r"0|([1-9]\d{0,38})", new_starting_hash_key):
|
if not re.match(r"0|([1-9]\d{0,38})", new_starting_hash_key):
|
||||||
raise InvalidArgumentError(new_starting_hash_key)
|
raise ValidationException(
|
||||||
|
value=new_starting_hash_key,
|
||||||
|
position="newStartingHashKey",
|
||||||
|
regex_to_match=r"0|([1-9]\d{0,38})",
|
||||||
|
)
|
||||||
new_starting_hash_key = int(new_starting_hash_key)
|
new_starting_hash_key = int(new_starting_hash_key)
|
||||||
|
|
||||||
shard = stream.shards[shard_to_split]
|
shard = stream.shards[shard_to_split]
|
||||||
|
|
||||||
|
if shard.starting_hash < new_starting_hash_key < shard.ending_hash:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise InvalidArgumentError(
|
||||||
|
message=f"NewStartingHashKey {new_starting_hash_key} used in SplitShard() on shard {shard_to_split} in stream {stream_name} under account {ACCOUNT_ID} is not both greater than one plus the shard's StartingHashKey {shard.starting_hash} and less than the shard's EndingHashKey {(shard.ending_hash-1)}."
|
||||||
|
)
|
||||||
|
|
||||||
|
if not shard.is_open:
|
||||||
|
raise InvalidArgumentError(
|
||||||
|
message=f"Shard {shard.shard_id} in stream {stream_name} under account {ACCOUNT_ID} has already been merged or split, and thus is not eligible for merging or splitting."
|
||||||
|
)
|
||||||
|
|
||||||
last_id = sorted(stream.shards.values(), key=attrgetter("_shard_id"))[
|
last_id = sorted(stream.shards.values(), key=attrgetter("_shard_id"))[
|
||||||
-1
|
-1
|
||||||
]._shard_id
|
]._shard_id
|
||||||
|
|
||||||
if shard.starting_hash < new_starting_hash_key < shard.ending_hash:
|
# Create two new shards
|
||||||
new_shard = Shard(last_id + 1, new_starting_hash_key, shard.ending_hash)
|
new_shard_1 = Shard(
|
||||||
shard.ending_hash = new_starting_hash_key
|
last_id + 1,
|
||||||
stream.shards[new_shard.shard_id] = new_shard
|
starting_hash=shard.starting_hash,
|
||||||
else:
|
ending_hash=new_starting_hash_key - 1,
|
||||||
raise InvalidArgumentError(new_starting_hash_key)
|
parent=shard.shard_id,
|
||||||
|
)
|
||||||
|
new_shard_2 = Shard(
|
||||||
|
last_id + 2,
|
||||||
|
starting_hash=new_starting_hash_key,
|
||||||
|
ending_hash=shard.ending_hash,
|
||||||
|
parent=shard.shard_id,
|
||||||
|
)
|
||||||
|
stream.shards[new_shard_1.shard_id] = new_shard_1
|
||||||
|
stream.shards[new_shard_2.shard_id] = new_shard_2
|
||||||
|
shard.is_open = False
|
||||||
|
|
||||||
records = shard.records
|
records = shard.records
|
||||||
shard.records = OrderedDict()
|
shard.records = OrderedDict()
|
||||||
@ -473,10 +520,10 @@ class KinesisBackend(BaseBackend):
|
|||||||
stream = self.describe_stream(stream_name)
|
stream = self.describe_stream(stream_name)
|
||||||
|
|
||||||
if shard_to_merge not in stream.shards:
|
if shard_to_merge not in stream.shards:
|
||||||
raise ResourceNotFoundError(shard_to_merge)
|
raise ShardNotFoundError(shard_to_merge, stream=stream_name)
|
||||||
|
|
||||||
if adjacent_shard_to_merge not in stream.shards:
|
if adjacent_shard_to_merge not in stream.shards:
|
||||||
raise ResourceNotFoundError(adjacent_shard_to_merge)
|
raise ShardNotFoundError(adjacent_shard_to_merge, stream=stream_name)
|
||||||
|
|
||||||
shard1 = stream.shards[shard_to_merge]
|
shard1 = stream.shards[shard_to_merge]
|
||||||
shard2 = stream.shards[adjacent_shard_to_merge]
|
shard2 = stream.shards[adjacent_shard_to_merge]
|
||||||
|
@ -28,7 +28,7 @@ def test_create_cluster():
|
|||||||
stream["StreamName"].should.equal("my_stream")
|
stream["StreamName"].should.equal("my_stream")
|
||||||
stream["HasMoreShards"].should.equal(False)
|
stream["HasMoreShards"].should.equal(False)
|
||||||
stream["StreamARN"].should.equal(
|
stream["StreamARN"].should.equal(
|
||||||
"arn:aws:kinesis:us-west-2:{}:my_stream".format(ACCOUNT_ID)
|
"arn:aws:kinesis:us-west-2:{}:stream/my_stream".format(ACCOUNT_ID)
|
||||||
)
|
)
|
||||||
stream["StreamStatus"].should.equal("ACTIVE")
|
stream["StreamStatus"].should.equal("ACTIVE")
|
||||||
|
|
||||||
@ -134,7 +134,7 @@ def test_describe_stream_summary():
|
|||||||
stream["StreamName"].should.equal(stream_name)
|
stream["StreamName"].should.equal(stream_name)
|
||||||
stream["OpenShardCount"].should.equal(shard_count)
|
stream["OpenShardCount"].should.equal(shard_count)
|
||||||
stream["StreamARN"].should.equal(
|
stream["StreamARN"].should.equal(
|
||||||
"arn:aws:kinesis:us-west-2:{}:{}".format(ACCOUNT_ID, stream_name)
|
"arn:aws:kinesis:us-west-2:{}:stream/{}".format(ACCOUNT_ID, stream_name)
|
||||||
)
|
)
|
||||||
stream["StreamStatus"].should.equal("ACTIVE")
|
stream["StreamStatus"].should.equal("ACTIVE")
|
||||||
|
|
||||||
@ -204,7 +204,11 @@ def test_get_invalid_shard_iterator_boto3():
|
|||||||
)
|
)
|
||||||
err = exc.value.response["Error"]
|
err = exc.value.response["Error"]
|
||||||
err["Code"].should.equal("ResourceNotFoundException")
|
err["Code"].should.equal("ResourceNotFoundException")
|
||||||
err["Message"].should.equal("Shard 123 under account 123456789012 not found.")
|
# There is some magic in AWS, that '123' is automatically converted into 'shardId-000000000123'
|
||||||
|
# AWS itself returns this normalized ID in the error message, not the given id
|
||||||
|
err["Message"].should.equal(
|
||||||
|
f"Shard 123 in stream {stream_name} under account {ACCOUNT_ID} does not exist"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# Has boto3 equivalent
|
# Has boto3 equivalent
|
||||||
@ -971,95 +975,6 @@ def test_remove_tags():
|
|||||||
tags.get("tag2").should.equal(None)
|
tags.get("tag2").should.equal(None)
|
||||||
|
|
||||||
|
|
||||||
# Has boto3 equivalent
|
|
||||||
@mock_kinesis_deprecated
|
|
||||||
def test_split_shard():
|
|
||||||
conn = boto.kinesis.connect_to_region("us-west-2")
|
|
||||||
stream_name = "my_stream"
|
|
||||||
|
|
||||||
conn.create_stream(stream_name, 2)
|
|
||||||
|
|
||||||
# Create some data
|
|
||||||
for index in range(1, 100):
|
|
||||||
conn.put_record(stream_name, str(index), str(index))
|
|
||||||
|
|
||||||
stream_response = conn.describe_stream(stream_name)
|
|
||||||
|
|
||||||
stream = stream_response["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(2)
|
|
||||||
|
|
||||||
shard_range = shards[0]["HashKeyRange"]
|
|
||||||
new_starting_hash = (
|
|
||||||
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
|
||||||
) // 2
|
|
||||||
conn.split_shard("my_stream", shards[0]["ShardId"], str(new_starting_hash))
|
|
||||||
|
|
||||||
stream_response = conn.describe_stream(stream_name)
|
|
||||||
|
|
||||||
stream = stream_response["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(3)
|
|
||||||
|
|
||||||
shard_range = shards[2]["HashKeyRange"]
|
|
||||||
new_starting_hash = (
|
|
||||||
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
|
||||||
) // 2
|
|
||||||
conn.split_shard("my_stream", shards[2]["ShardId"], str(new_starting_hash))
|
|
||||||
|
|
||||||
stream_response = conn.describe_stream(stream_name)
|
|
||||||
|
|
||||||
stream = stream_response["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(4)
|
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
|
||||||
def test_split_shard_boto3():
|
|
||||||
client = boto3.client("kinesis", region_name="eu-west-2")
|
|
||||||
stream_name = "my_stream_summary"
|
|
||||||
client.create_stream(StreamName=stream_name, ShardCount=2)
|
|
||||||
|
|
||||||
for index in range(1, 100):
|
|
||||||
client.put_record(
|
|
||||||
StreamName=stream_name,
|
|
||||||
Data=f"data_{index}".encode("utf-8"),
|
|
||||||
PartitionKey=str(index),
|
|
||||||
)
|
|
||||||
|
|
||||||
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(2)
|
|
||||||
|
|
||||||
shard_range = shards[0]["HashKeyRange"]
|
|
||||||
new_starting_hash = (
|
|
||||||
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
|
||||||
) // 2
|
|
||||||
client.split_shard(
|
|
||||||
StreamName=stream_name,
|
|
||||||
ShardToSplit=shards[0]["ShardId"],
|
|
||||||
NewStartingHashKey=str(new_starting_hash),
|
|
||||||
)
|
|
||||||
|
|
||||||
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(3)
|
|
||||||
|
|
||||||
shard_range = shards[2]["HashKeyRange"]
|
|
||||||
new_starting_hash = (
|
|
||||||
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
|
||||||
) // 2
|
|
||||||
client.split_shard(
|
|
||||||
StreamName=stream_name,
|
|
||||||
ShardToSplit=shards[2]["ShardId"],
|
|
||||||
NewStartingHashKey=str(new_starting_hash),
|
|
||||||
)
|
|
||||||
|
|
||||||
stream = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(4)
|
|
||||||
|
|
||||||
|
|
||||||
# Has boto3 equivalent
|
# Has boto3 equivalent
|
||||||
@mock_kinesis_deprecated
|
@mock_kinesis_deprecated
|
||||||
def test_merge_shards():
|
def test_merge_shards():
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
import boto3
|
import boto3
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
from moto import mock_kinesis
|
from moto import mock_kinesis
|
||||||
|
from moto.core import ACCOUNT_ID
|
||||||
|
|
||||||
import sure # noqa # pylint: disable=unused-import
|
import sure # noqa # pylint: disable=unused-import
|
||||||
|
|
||||||
@ -35,58 +38,6 @@ def test_describe_stream_limit_parameter():
|
|||||||
with_filter["HasMoreShards"].should.equal(False)
|
with_filter["HasMoreShards"].should.equal(False)
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
|
||||||
def test_split_shard():
|
|
||||||
conn = boto3.client("kinesis", region_name="us-west-2")
|
|
||||||
stream_name = "my_stream"
|
|
||||||
|
|
||||||
conn.create_stream(StreamName=stream_name, ShardCount=2)
|
|
||||||
|
|
||||||
# Create some data
|
|
||||||
for index in range(1, 100):
|
|
||||||
conn.put_record(
|
|
||||||
StreamName=stream_name, Data="data:" + str(index), PartitionKey=str(index)
|
|
||||||
)
|
|
||||||
|
|
||||||
stream_response = conn.describe_stream(StreamName=stream_name)
|
|
||||||
|
|
||||||
stream = stream_response["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(2)
|
|
||||||
|
|
||||||
shard_range = shards[0]["HashKeyRange"]
|
|
||||||
new_starting_hash = (
|
|
||||||
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
|
||||||
) // 2
|
|
||||||
conn.split_shard(
|
|
||||||
StreamName=stream_name,
|
|
||||||
ShardToSplit=shards[0]["ShardId"],
|
|
||||||
NewStartingHashKey=str(new_starting_hash),
|
|
||||||
)
|
|
||||||
|
|
||||||
stream_response = conn.describe_stream(StreamName=stream_name)
|
|
||||||
|
|
||||||
stream = stream_response["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(3)
|
|
||||||
|
|
||||||
shard_range = shards[2]["HashKeyRange"]
|
|
||||||
new_starting_hash = (
|
|
||||||
int(shard_range["EndingHashKey"]) + int(shard_range["StartingHashKey"])
|
|
||||||
) // 2
|
|
||||||
conn.split_shard(
|
|
||||||
StreamName=stream_name,
|
|
||||||
ShardToSplit=shards[2]["ShardId"],
|
|
||||||
NewStartingHashKey=str(new_starting_hash),
|
|
||||||
)
|
|
||||||
|
|
||||||
stream_response = conn.describe_stream(StreamName=stream_name)
|
|
||||||
|
|
||||||
stream = stream_response["StreamDescription"]
|
|
||||||
shards = stream["Shards"]
|
|
||||||
shards.should.have.length_of(4)
|
|
||||||
|
|
||||||
|
|
||||||
@mock_kinesis
|
@mock_kinesis
|
||||||
def test_list_shards():
|
def test_list_shards():
|
||||||
conn = boto3.client("kinesis", region_name="us-west-2")
|
conn = boto3.client("kinesis", region_name="us-west-2")
|
||||||
@ -168,3 +119,181 @@ def test_list_shards_paging():
|
|||||||
["shardId-000000000008", "shardId-000000000009"]
|
["shardId-000000000008", "shardId-000000000009"]
|
||||||
)
|
)
|
||||||
resp.should_not.have.key("NextToken")
|
resp.should_not.have.key("NextToken")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_create_shard():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
client.create_stream(StreamName="my-stream", ShardCount=2)
|
||||||
|
|
||||||
|
resp = client.describe_stream(StreamName="my-stream")
|
||||||
|
desc = resp["StreamDescription"]
|
||||||
|
desc.should.have.key("StreamName").equal("my-stream")
|
||||||
|
desc.should.have.key("StreamARN").equal(
|
||||||
|
f"arn:aws:kinesis:us-west-2:{ACCOUNT_ID}:stream/my-stream"
|
||||||
|
)
|
||||||
|
desc.should.have.key("Shards").length_of(2)
|
||||||
|
desc.should.have.key("StreamStatus").equals("ACTIVE")
|
||||||
|
desc.should.have.key("HasMoreShards").equals(False)
|
||||||
|
desc.should.have.key("RetentionPeriodHours").equals(24)
|
||||||
|
desc.should.have.key("StreamCreationTimestamp")
|
||||||
|
desc.should.have.key("EnhancedMonitoring").should.equal([{"ShardLevelMetrics": []}])
|
||||||
|
desc.should.have.key("EncryptionType").should.equal("NONE")
|
||||||
|
|
||||||
|
shards = desc["Shards"]
|
||||||
|
shards[0].should.have.key("ShardId").equal("shardId-000000000000")
|
||||||
|
shards[0].should.have.key("HashKeyRange")
|
||||||
|
shards[0]["HashKeyRange"].should.have.key("StartingHashKey").equals("0")
|
||||||
|
shards[0]["HashKeyRange"].should.have.key("EndingHashKey")
|
||||||
|
shards[0].should.have.key("SequenceNumberRange")
|
||||||
|
shards[0]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
|
||||||
|
shards[0]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_split_shard_with_invalid_name():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
client.create_stream(StreamName="my-stream", ShardCount=2)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.split_shard(
|
||||||
|
StreamName="my-stream",
|
||||||
|
ShardToSplit="?",
|
||||||
|
NewStartingHashKey="170141183460469231731687303715884105728",
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ValidationException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
"1 validation error detected: Value '?' at 'shardToSplit' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_split_shard_with_unknown_name():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
client.create_stream(StreamName="my-stream", ShardCount=2)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.split_shard(
|
||||||
|
StreamName="my-stream",
|
||||||
|
ShardToSplit="unknown",
|
||||||
|
NewStartingHashKey="170141183460469231731687303715884105728",
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ResourceNotFoundException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
"Could not find shard unknown in stream my-stream under account 123456789012."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_split_shard_invalid_hashkey():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
client.create_stream(StreamName="my-stream", ShardCount=2)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.split_shard(
|
||||||
|
StreamName="my-stream",
|
||||||
|
ShardToSplit="shardId-000000000001",
|
||||||
|
NewStartingHashKey="sth",
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("ValidationException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
"1 validation error detected: Value 'sth' at 'newStartingHashKey' failed to satisfy constraint: Member must satisfy regular expression pattern: 0|([1-9]\\d{0,38})"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_split_shard_hashkey_out_of_bounds():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
client.create_stream(StreamName="my-stream", ShardCount=2)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.split_shard(
|
||||||
|
StreamName="my-stream",
|
||||||
|
ShardToSplit="shardId-000000000001",
|
||||||
|
NewStartingHashKey="170141183460469231731687303715884000000",
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("InvalidArgumentException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
f"NewStartingHashKey 170141183460469231731687303715884000000 used in SplitShard() on shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} is not both greater than one plus the shard's StartingHashKey 170141183460469231731687303715884105728 and less than the shard's EndingHashKey 340282366920938463463374607431768211455."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_split_shard():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
stream_name = "my-stream"
|
||||||
|
client.create_stream(StreamName=stream_name, ShardCount=2)
|
||||||
|
|
||||||
|
for index in range(1, 100):
|
||||||
|
client.put_record(
|
||||||
|
StreamName=stream_name,
|
||||||
|
Data=f"data_{index}".encode("utf-8"),
|
||||||
|
PartitionKey=str(index),
|
||||||
|
)
|
||||||
|
|
||||||
|
original_shards = client.describe_stream(StreamName=stream_name)[
|
||||||
|
"StreamDescription"
|
||||||
|
]["Shards"]
|
||||||
|
|
||||||
|
client.split_shard(
|
||||||
|
StreamName=stream_name,
|
||||||
|
ShardToSplit="shardId-000000000001",
|
||||||
|
NewStartingHashKey="170141183460469231731687303715884105829",
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.describe_stream(StreamName=stream_name)["StreamDescription"]
|
||||||
|
shards = resp["Shards"]
|
||||||
|
shards.should.have.length_of(4)
|
||||||
|
shards[0].should.have.key("ShardId").equals("shardId-000000000000")
|
||||||
|
shards[0].should.have.key("HashKeyRange")
|
||||||
|
shards[0].shouldnt.have.key("ParentShardId")
|
||||||
|
|
||||||
|
shards[1].should.have.key("ShardId").equals("shardId-000000000001")
|
||||||
|
shards[1].shouldnt.have.key("ParentShardId")
|
||||||
|
shards[1].should.have.key("HashKeyRange")
|
||||||
|
shards[1]["HashKeyRange"].should.have.key("StartingHashKey").equals(
|
||||||
|
original_shards[1]["HashKeyRange"]["StartingHashKey"]
|
||||||
|
)
|
||||||
|
shards[1]["HashKeyRange"].should.have.key("EndingHashKey").equals(
|
||||||
|
original_shards[1]["HashKeyRange"]["EndingHashKey"]
|
||||||
|
)
|
||||||
|
shards[1]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
|
||||||
|
shards[1]["SequenceNumberRange"].should.have.key("EndingSequenceNumber")
|
||||||
|
|
||||||
|
shards[2].should.have.key("ShardId").equals("shardId-000000000002")
|
||||||
|
shards[2].should.have.key("ParentShardId").equals(shards[1]["ShardId"])
|
||||||
|
shards[2]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
|
||||||
|
shards[2]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber")
|
||||||
|
|
||||||
|
shards[3].should.have.key("ShardId").equals("shardId-000000000003")
|
||||||
|
shards[3].should.have.key("ParentShardId").equals(shards[1]["ShardId"])
|
||||||
|
shards[3]["SequenceNumberRange"].should.have.key("StartingSequenceNumber")
|
||||||
|
shards[3]["SequenceNumberRange"].shouldnt.have.key("EndingSequenceNumber")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kinesis
|
||||||
|
def test_split_shard_that_was_split_before():
|
||||||
|
client = boto3.client("kinesis", region_name="us-west-2")
|
||||||
|
client.create_stream(StreamName="my-stream", ShardCount=2)
|
||||||
|
|
||||||
|
client.split_shard(
|
||||||
|
StreamName="my-stream",
|
||||||
|
ShardToSplit="shardId-000000000001",
|
||||||
|
NewStartingHashKey="170141183460469231731687303715884105829",
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
client.split_shard(
|
||||||
|
StreamName="my-stream",
|
||||||
|
ShardToSplit="shardId-000000000001",
|
||||||
|
NewStartingHashKey="170141183460469231731687303715884105829",
|
||||||
|
)
|
||||||
|
err = exc.value.response["Error"]
|
||||||
|
err["Code"].should.equal("InvalidArgumentException")
|
||||||
|
err["Message"].should.equal(
|
||||||
|
f"Shard shardId-000000000001 in stream my-stream under account {ACCOUNT_ID} has already been merged or split, and thus is not eligible for merging or splitting."
|
||||||
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user