moto/tests/test_dynamodbstreams/test_dynamodbstreams.py

291 lines
10 KiB
Python
Raw Normal View History

from __future__ import unicode_literals, print_function
2018-11-08 18:22:24 +00:00
from nose.tools import assert_raises
import boto3
from moto import mock_dynamodb2, mock_dynamodbstreams
2018-11-08 18:22:24 +00:00
class TestCore():
stream_arn = None
mocks = []
def setup(self):
self.mocks = [mock_dynamodb2(), mock_dynamodbstreams()]
for m in self.mocks:
m.start()
# create a table with a stream
conn = boto3.client('dynamodb', region_name='us-east-1')
resp = conn.create_table(
TableName='test-streams',
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'id',
'AttributeType': 'S'}],
ProvisionedThroughput={'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1},
StreamSpecification={
'StreamEnabled': True,
'StreamViewType': 'NEW_AND_OLD_IMAGES'
}
)
self.stream_arn = resp['TableDescription']['LatestStreamArn']
def teardown(self):
conn = boto3.client('dynamodb', region_name='us-east-1')
conn.delete_table(TableName='test-streams')
self.stream_arn = None
for m in self.mocks:
m.stop()
def test_verify_stream(self):
conn = boto3.client('dynamodb', region_name='us-east-1')
resp = conn.describe_table(TableName='test-streams')
assert 'LatestStreamArn' in resp['Table']
def test_describe_stream(self):
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = conn.describe_stream(StreamArn=self.stream_arn)
assert 'StreamDescription' in resp
desc = resp['StreamDescription']
assert desc['StreamArn'] == self.stream_arn
assert desc['TableName'] == 'test-streams'
def test_list_streams(self):
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = conn.list_streams()
assert resp['Streams'][0]['StreamArn'] == self.stream_arn
resp = conn.list_streams(TableName='no-stream')
assert not resp['Streams']
def test_get_shard_iterator(self):
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = conn.describe_stream(StreamArn=self.stream_arn)
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
ShardIteratorType='TRIM_HORIZON'
)
assert 'ShardIterator' in resp
def test_get_shard_iterator_at_sequence_number(self):
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = conn.describe_stream(StreamArn=self.stream_arn)
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
ShardIteratorType='AT_SEQUENCE_NUMBER',
SequenceNumber=resp['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber']
)
assert 'ShardIterator' in resp
def test_get_shard_iterator_after_sequence_number(self):
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = conn.describe_stream(StreamArn=self.stream_arn)
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
ShardIteratorType='AFTER_SEQUENCE_NUMBER',
SequenceNumber=resp['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber']
)
assert 'ShardIterator' in resp
def test_get_records_empty(self):
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = conn.describe_stream(StreamArn=self.stream_arn)
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
2018-11-08 18:22:24 +00:00
ShardIteratorType='LATEST'
)
iterator_id = resp['ShardIterator']
resp = conn.get_records(ShardIterator=iterator_id)
assert 'Records' in resp
assert len(resp['Records']) == 0
def test_get_records_seq(self):
conn = boto3.client('dynamodb', region_name='us-east-1')
conn.put_item(
TableName='test-streams',
Item={
'id': {'S': 'entry1'},
'first_col': {'S': 'foo'}
}
)
conn.put_item(
TableName='test-streams',
Item={
'id': {'S': 'entry1'},
'first_col': {'S': 'bar'},
'second_col': {'S': 'baz'}
}
)
conn.delete_item(
TableName='test-streams',
Key={'id': {'S': 'entry1'}}
)
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = conn.describe_stream(StreamArn=self.stream_arn)
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
ShardIteratorType='TRIM_HORIZON'
)
iterator_id = resp['ShardIterator']
resp = conn.get_records(ShardIterator=iterator_id)
assert len(resp['Records']) == 3
assert resp['Records'][0]['eventName'] == 'INSERT'
assert resp['Records'][1]['eventName'] == 'MODIFY'
assert resp['Records'][2]['eventName'] == 'DELETE'
sequence_number_modify = resp['Records'][1]['dynamodb']['SequenceNumber']
# now try fetching from the next shard iterator, it should be
# empty
resp = conn.get_records(ShardIterator=resp['NextShardIterator'])
assert len(resp['Records']) == 0
2018-11-08 18:22:24 +00:00
#check that if we get the shard iterator AT_SEQUENCE_NUMBER will get the MODIFY event
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
ShardIteratorType='AT_SEQUENCE_NUMBER',
SequenceNumber=sequence_number_modify
)
iterator_id = resp['ShardIterator']
resp = conn.get_records(ShardIterator=iterator_id)
assert len(resp['Records']) == 2
assert resp['Records'][0]['eventName'] == 'MODIFY'
assert resp['Records'][1]['eventName'] == 'DELETE'
#check that if we get the shard iterator AFTER_SEQUENCE_NUMBER will get the DELETE event
resp = conn.get_shard_iterator(
StreamArn=self.stream_arn,
ShardId=shard_id,
ShardIteratorType='AFTER_SEQUENCE_NUMBER',
SequenceNumber=sequence_number_modify
)
iterator_id = resp['ShardIterator']
resp = conn.get_records(ShardIterator=iterator_id)
assert len(resp['Records']) == 1
assert resp['Records'][0]['eventName'] == 'DELETE'
2018-11-08 18:22:24 +00:00
class TestEdges():
mocks = []
def setup(self):
self.mocks = [mock_dynamodb2(), mock_dynamodbstreams()]
for m in self.mocks:
m.start()
def teardown(self):
for m in self.mocks:
m.stop()
def test_enable_stream_on_table(self):
conn = boto3.client('dynamodb', region_name='us-east-1')
resp = conn.create_table(
TableName='test-streams',
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'id',
'AttributeType': 'S'}],
ProvisionedThroughput={'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1}
)
assert 'StreamSpecification' not in resp['TableDescription']
resp = conn.update_table(
TableName='test-streams',
StreamSpecification={
'StreamViewType': 'KEYS_ONLY'
}
)
assert 'StreamSpecification' in resp['TableDescription']
assert resp['TableDescription']['StreamSpecification'] == {
'StreamEnabled': True,
'StreamViewType': 'KEYS_ONLY'
}
assert 'LatestStreamLabel' in resp['TableDescription']
# now try to enable it again
2018-11-08 18:57:44 +00:00
with assert_raises(conn.exceptions.ResourceInUseException):
2018-11-08 18:22:24 +00:00
resp = conn.update_table(
TableName='test-streams',
StreamSpecification={
'StreamViewType': 'OLD_IMAGES'
}
)
def test_stream_with_range_key(self):
dyn = boto3.client('dynamodb', region_name='us-east-1')
resp = dyn.create_table(
TableName='test-streams',
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'},
{'AttributeName': 'color', 'KeyType': 'RANGE'}],
AttributeDefinitions=[{'AttributeName': 'id',
'AttributeType': 'S'},
{'AttributeName': 'color',
'AttributeType': 'S'}],
ProvisionedThroughput={'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1},
StreamSpecification={
'StreamViewType': 'NEW_IMAGES'
}
)
stream_arn = resp['TableDescription']['LatestStreamArn']
streams = boto3.client('dynamodbstreams', region_name='us-east-1')
resp = streams.describe_stream(StreamArn=stream_arn)
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
resp = streams.get_shard_iterator(
StreamArn=stream_arn,
ShardId=shard_id,
ShardIteratorType='LATEST'
)
iterator_id = resp['ShardIterator']
dyn.put_item(
TableName='test-streams',
Item={'id': {'S': 'row1'}, 'color': {'S': 'blue'}}
)
dyn.put_item(
TableName='test-streams',
Item={'id': {'S': 'row2'}, 'color': {'S': 'green'}}
)
resp = streams.get_records(ShardIterator=iterator_id)
assert len(resp['Records']) == 2
assert resp['Records'][0]['eventName'] == 'INSERT'
assert resp['Records'][1]['eventName'] == 'INSERT'