moto/tests/test_dynamodbstreams/test_dynamodbstreams.py

272 lines
9.8 KiB
Python
Raw Normal View History

import boto3
import pytest
2018-11-08 18:22:24 +00:00
2024-01-07 12:03:33 +00:00
from moto import mock_aws
2019-10-31 15:44:26 +00:00
class TestCore:
stream_arn = None
mocks = []
2019-10-31 15:44:26 +00:00
2022-10-28 00:37:11 +00:00
def setup_method(self):
2024-01-07 12:03:33 +00:00
self.mock = mock_aws()
self.mock.start()
2019-10-31 15:44:26 +00:00
# create a table with a stream
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodb", region_name="us-east-1")
resp = conn.create_table(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
StreamSpecification={
2019-10-31 15:44:26 +00:00
"StreamEnabled": True,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
)
2019-10-31 15:44:26 +00:00
self.stream_arn = resp["TableDescription"]["LatestStreamArn"]
2022-10-28 00:37:11 +00:00
def teardown_method(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodb", region_name="us-east-1")
conn.delete_table(TableName="test-streams")
self.stream_arn = None
2024-01-07 12:03:33 +00:00
try:
self.mock.stop()
except RuntimeError:
pass
def test_verify_stream(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodb", region_name="us-east-1")
resp = conn.describe_table(TableName="test-streams")
assert "LatestStreamArn" in resp["Table"]
def test_describe_stream(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodbstreams", region_name="us-east-1")
resp = conn.describe_stream(StreamArn=self.stream_arn)
2019-10-31 15:44:26 +00:00
assert "StreamDescription" in resp
desc = resp["StreamDescription"]
assert desc["StreamArn"] == self.stream_arn
assert desc["TableName"] == "test-streams"
def test_list_streams(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodbstreams", region_name="us-east-1")
resp = conn.list_streams()
2019-10-31 15:44:26 +00:00
assert resp["Streams"][0]["StreamArn"] == self.stream_arn
2019-10-31 15:44:26 +00:00
resp = conn.list_streams(TableName="no-stream")
assert not resp["Streams"]
def test_get_shard_iterator(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodbstreams", region_name="us-east-1")
resp = conn.describe_stream(StreamArn=self.stream_arn)
2019-10-31 15:44:26 +00:00
shard_id = resp["StreamDescription"]["Shards"][0]["ShardId"]
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
2019-10-31 15:44:26 +00:00
ShardIteratorType="TRIM_HORIZON",
)
2019-10-31 15:44:26 +00:00
assert "ShardIterator" in resp
def test_get_shard_iterator_at_sequence_number(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodbstreams", region_name="us-east-1")
resp = conn.describe_stream(StreamArn=self.stream_arn)
2019-10-31 15:44:26 +00:00
shard_id = resp["StreamDescription"]["Shards"][0]["ShardId"]
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
2019-10-31 15:44:26 +00:00
ShardIteratorType="AT_SEQUENCE_NUMBER",
SequenceNumber=resp["StreamDescription"]["Shards"][0][
"SequenceNumberRange"
]["StartingSequenceNumber"],
)
2019-10-31 15:44:26 +00:00
assert "ShardIterator" in resp
def test_get_shard_iterator_after_sequence_number(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodbstreams", region_name="us-east-1")
resp = conn.describe_stream(StreamArn=self.stream_arn)
2019-10-31 15:44:26 +00:00
shard_id = resp["StreamDescription"]["Shards"][0]["ShardId"]
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
2019-10-31 15:44:26 +00:00
ShardIteratorType="AFTER_SEQUENCE_NUMBER",
SequenceNumber=resp["StreamDescription"]["Shards"][0][
"SequenceNumberRange"
]["StartingSequenceNumber"],
)
2019-10-31 15:44:26 +00:00
assert "ShardIterator" in resp
def test_get_records_empty(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodbstreams", region_name="us-east-1")
resp = conn.describe_stream(StreamArn=self.stream_arn)
2019-10-31 15:44:26 +00:00
shard_id = resp["StreamDescription"]["Shards"][0]["ShardId"]
resp = conn.get_shard_iterator(
2019-10-31 15:44:26 +00:00
StreamArn=self.stream_arn, ShardId=shard_id, ShardIteratorType="LATEST"
)
2019-10-31 15:44:26 +00:00
iterator_id = resp["ShardIterator"]
resp = conn.get_records(ShardIterator=iterator_id)
2019-10-31 15:44:26 +00:00
assert "Records" in resp
assert len(resp["Records"]) == 0
def test_get_records_seq(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodb", region_name="us-east-1")
conn.put_item(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
2020-04-06 09:55:54 +00:00
Item={"id": {"S": "entry1"}, "first_col": {"S": "foo"}},
)
conn.put_item(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
Item={
2019-10-31 15:44:26 +00:00
"id": {"S": "entry1"},
"first_col": {"S": "bar"},
"second_col": {"S": "baz"},
2020-04-06 09:55:54 +00:00
"a": {"L": [{"M": {"b": {"S": "bar1"}}}]},
2019-10-31 15:44:26 +00:00
},
)
2019-10-31 15:44:26 +00:00
conn.delete_item(TableName="test-streams", Key={"id": {"S": "entry1"}})
conn = boto3.client("dynamodbstreams", region_name="us-east-1")
resp = conn.describe_stream(StreamArn=self.stream_arn)
2019-10-31 15:44:26 +00:00
shard_id = resp["StreamDescription"]["Shards"][0]["ShardId"]
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
2019-10-31 15:44:26 +00:00
ShardIteratorType="TRIM_HORIZON",
)
2019-10-31 15:44:26 +00:00
iterator_id = resp["ShardIterator"]
resp = conn.get_records(ShardIterator=iterator_id)
2019-10-31 15:44:26 +00:00
assert len(resp["Records"]) == 3
assert resp["Records"][0]["eventName"] == "INSERT"
assert resp["Records"][1]["eventName"] == "MODIFY"
assert resp["Records"][2]["eventName"] == "REMOVE"
2019-10-31 15:44:26 +00:00
sequence_number_modify = resp["Records"][1]["dynamodb"]["SequenceNumber"]
# now try fetching from the next shard iterator, it should be
# empty
2019-10-31 15:44:26 +00:00
resp = conn.get_records(ShardIterator=resp["NextShardIterator"])
assert len(resp["Records"]) == 0
2018-11-08 18:22:24 +00:00
2019-07-30 02:21:02 +00:00
# check that if we get the shard iterator AT_SEQUENCE_NUMBER will get the MODIFY event
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
2019-10-31 15:44:26 +00:00
ShardIteratorType="AT_SEQUENCE_NUMBER",
SequenceNumber=sequence_number_modify,
)
2019-10-31 15:44:26 +00:00
iterator_id = resp["ShardIterator"]
resp = conn.get_records(ShardIterator=iterator_id)
2019-10-31 15:44:26 +00:00
assert len(resp["Records"]) == 2
assert resp["Records"][0]["eventName"] == "MODIFY"
assert resp["Records"][1]["eventName"] == "REMOVE"
2019-07-30 02:21:02 +00:00
# check that if we get the shard iterator AFTER_SEQUENCE_NUMBER will get the DELETE event
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
2019-10-31 15:44:26 +00:00
ShardIteratorType="AFTER_SEQUENCE_NUMBER",
SequenceNumber=sequence_number_modify,
)
2019-10-31 15:44:26 +00:00
iterator_id = resp["ShardIterator"]
resp = conn.get_records(ShardIterator=iterator_id)
2019-10-31 15:44:26 +00:00
assert len(resp["Records"]) == 1
assert resp["Records"][0]["eventName"] == "REMOVE"
2018-11-08 18:22:24 +00:00
2019-10-31 15:44:26 +00:00
class TestEdges:
2018-11-08 18:22:24 +00:00
mocks = []
2022-10-28 00:37:11 +00:00
def setup_method(self):
2024-01-07 12:03:33 +00:00
self.mock = mock_aws()
self.mock.start()
2018-11-08 18:22:24 +00:00
2022-10-28 00:37:11 +00:00
def teardown_method(self):
2024-01-07 12:03:33 +00:00
self.mock.stop()
2018-11-08 18:22:24 +00:00
def test_enable_stream_on_table(self):
2019-10-31 15:44:26 +00:00
conn = boto3.client("dynamodb", region_name="us-east-1")
2018-11-08 18:22:24 +00:00
resp = conn.create_table(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
2018-11-08 18:22:24 +00:00
)
2019-10-31 15:44:26 +00:00
assert "StreamSpecification" not in resp["TableDescription"]
2018-11-08 18:22:24 +00:00
resp = conn.update_table(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
StreamSpecification={"StreamViewType": "KEYS_ONLY", "StreamEnabled": True},
2018-11-08 18:22:24 +00:00
)
2019-10-31 15:44:26 +00:00
assert "StreamSpecification" in resp["TableDescription"]
assert resp["TableDescription"]["StreamSpecification"] == {
"StreamEnabled": True,
"StreamViewType": "KEYS_ONLY",
2018-11-08 18:22:24 +00:00
}
2019-10-31 15:44:26 +00:00
assert "LatestStreamLabel" in resp["TableDescription"]
2018-11-08 18:22:24 +00:00
# now try to enable it again
with pytest.raises(conn.exceptions.ResourceInUseException):
2018-11-08 18:22:24 +00:00
resp = conn.update_table(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
2019-11-21 22:53:58 +00:00
StreamSpecification={
"StreamViewType": "OLD_IMAGES",
"StreamEnabled": True,
},
2018-11-08 18:22:24 +00:00
)
2019-10-31 15:44:26 +00:00
2018-11-08 18:22:24 +00:00
def test_stream_with_range_key(self):
2019-10-31 15:44:26 +00:00
dyn = boto3.client("dynamodb", region_name="us-east-1")
2018-11-08 18:22:24 +00:00
resp = dyn.create_table(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
KeySchema=[
{"AttributeName": "id", "KeyType": "HASH"},
{"AttributeName": "color", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
{"AttributeName": "color", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
StreamSpecification={"StreamViewType": "NEW_IMAGES", "StreamEnabled": True},
2018-11-08 18:22:24 +00:00
)
2019-10-31 15:44:26 +00:00
stream_arn = resp["TableDescription"]["LatestStreamArn"]
2018-11-08 18:22:24 +00:00
2019-10-31 15:44:26 +00:00
streams = boto3.client("dynamodbstreams", region_name="us-east-1")
2018-11-08 18:22:24 +00:00
resp = streams.describe_stream(StreamArn=stream_arn)
2019-10-31 15:44:26 +00:00
shard_id = resp["StreamDescription"]["Shards"][0]["ShardId"]
2018-11-08 18:22:24 +00:00
resp = streams.get_shard_iterator(
2019-10-31 15:44:26 +00:00
StreamArn=stream_arn, ShardId=shard_id, ShardIteratorType="LATEST"
2018-11-08 18:22:24 +00:00
)
2019-10-31 15:44:26 +00:00
iterator_id = resp["ShardIterator"]
2018-11-08 18:22:24 +00:00
dyn.put_item(
2019-10-31 15:44:26 +00:00
TableName="test-streams", Item={"id": {"S": "row1"}, "color": {"S": "blue"}}
2018-11-08 18:22:24 +00:00
)
dyn.put_item(
2019-10-31 15:44:26 +00:00
TableName="test-streams",
Item={"id": {"S": "row2"}, "color": {"S": "green"}},
2018-11-08 18:22:24 +00:00
)
resp = streams.get_records(ShardIterator=iterator_id)
2019-10-31 15:44:26 +00:00
assert len(resp["Records"]) == 2
assert resp["Records"][0]["eventName"] == "INSERT"
assert resp["Records"][1]["eventName"] == "INSERT"