Tweak functionality and add tests
This commit is contained in:
parent
ff6a57f443
commit
9d190aa04e
@ -406,10 +406,13 @@ class Table(BaseModel):
|
|||||||
|
|
||||||
def set_stream_specification(self, streams):
|
def set_stream_specification(self, streams):
|
||||||
self.stream_specification = streams
|
self.stream_specification = streams
|
||||||
if streams and streams.get('StreamEnabled'):
|
if streams and (streams.get('StreamEnabled')
|
||||||
|
or streams.get('StreamViewType')):
|
||||||
|
self.stream_specification['StreamEnabled'] = True
|
||||||
self.latest_stream_label = datetime.datetime.utcnow().isoformat()
|
self.latest_stream_label = datetime.datetime.utcnow().isoformat()
|
||||||
self.stream_shard = StreamShard(self)
|
self.stream_shard = StreamShard(self)
|
||||||
else:
|
else:
|
||||||
|
self.stream_specification = {'StreamEnabled': False}
|
||||||
self.latest_stream_label = None
|
self.latest_stream_label = None
|
||||||
self.stream_shard = None
|
self.stream_shard = None
|
||||||
|
|
||||||
@ -429,11 +432,11 @@ class Table(BaseModel):
|
|||||||
'LocalSecondaryIndexes': [index for index in self.indexes],
|
'LocalSecondaryIndexes': [index for index in self.indexes],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if self.stream_specification:
|
if self.stream_specification and self.stream_specification['StreamEnabled']:
|
||||||
results[base_key]['StreamSpecification'] = self.stream_specification
|
results[base_key]['StreamSpecification'] = self.stream_specification
|
||||||
if self.latest_stream_label:
|
if self.latest_stream_label:
|
||||||
results[base_key]['LatestStreamLabel'] = self.latest_stream_label
|
results[base_key]['LatestStreamLabel'] = self.latest_stream_label
|
||||||
results[base_key]['LatestStreamArn'] = self.table_arn + '/stream/' + self.latest_stream_label
|
results[base_key]['LatestStreamArn'] = self.table_arn + '/stream/' + self.latest_stream_label
|
||||||
return results
|
return results
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
@ -779,7 +782,9 @@ class DynamoDBBackend(BaseBackend):
|
|||||||
|
|
||||||
def update_table_streams(self, name, stream_specification):
|
def update_table_streams(self, name, stream_specification):
|
||||||
table = self.tables[name]
|
table = self.tables[name]
|
||||||
if stream_specification['StreamEnabled'] and table.latest_stream_label:
|
if ((stream_specification.get('StreamEnabled')
|
||||||
|
or stream_specification.get('StreamViewType'))
|
||||||
|
and table.latest_stream_label):
|
||||||
raise ValueError('Table already has stream enabled')
|
raise ValueError('Table already has stream enabled')
|
||||||
table.set_stream_specification(stream_specification)
|
table.set_stream_specification(stream_specification)
|
||||||
return table
|
return table
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
from __future__ import unicode_literals, print_function
|
from __future__ import unicode_literals, print_function
|
||||||
|
|
||||||
|
from nose.tools import assert_raises
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
from moto import mock_dynamodb2, mock_dynamodbstreams
|
from moto import mock_dynamodb2, mock_dynamodbstreams
|
||||||
|
|
||||||
|
|
||||||
class TestClass():
|
class TestCore():
|
||||||
stream_arn = None
|
stream_arn = None
|
||||||
mocks = []
|
mocks = []
|
||||||
|
|
||||||
@ -84,7 +86,7 @@ class TestClass():
|
|||||||
resp = conn.get_shard_iterator(
|
resp = conn.get_shard_iterator(
|
||||||
StreamArn=self.stream_arn,
|
StreamArn=self.stream_arn,
|
||||||
ShardId=shard_id,
|
ShardId=shard_id,
|
||||||
ShardIteratorType='TRIM_HORIZON'
|
ShardIteratorType='LATEST'
|
||||||
)
|
)
|
||||||
iterator_id = resp['ShardIterator']
|
iterator_id = resp['ShardIterator']
|
||||||
|
|
||||||
@ -137,3 +139,96 @@ class TestClass():
|
|||||||
# empty
|
# empty
|
||||||
resp = conn.get_records(ShardIterator=resp['NextShardIterator'])
|
resp = conn.get_records(ShardIterator=resp['NextShardIterator'])
|
||||||
assert len(resp['Records']) == 0
|
assert len(resp['Records']) == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestEdges():
|
||||||
|
mocks = []
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self.mocks = [mock_dynamodb2(), mock_dynamodbstreams()]
|
||||||
|
for m in self.mocks:
|
||||||
|
m.start()
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
for m in self.mocks:
|
||||||
|
m.stop()
|
||||||
|
|
||||||
|
|
||||||
|
def test_enable_stream_on_table(self):
|
||||||
|
conn = boto3.client('dynamodb', region_name='us-east-1')
|
||||||
|
resp = conn.create_table(
|
||||||
|
TableName='test-streams',
|
||||||
|
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||||
|
AttributeDefinitions=[{'AttributeName': 'id',
|
||||||
|
'AttributeType': 'S'}],
|
||||||
|
ProvisionedThroughput={'ReadCapacityUnits': 1,
|
||||||
|
'WriteCapacityUnits': 1}
|
||||||
|
)
|
||||||
|
assert 'StreamSpecification' not in resp['TableDescription']
|
||||||
|
|
||||||
|
resp = conn.update_table(
|
||||||
|
TableName='test-streams',
|
||||||
|
StreamSpecification={
|
||||||
|
'StreamViewType': 'KEYS_ONLY'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert 'StreamSpecification' in resp['TableDescription']
|
||||||
|
assert resp['TableDescription']['StreamSpecification'] == {
|
||||||
|
'StreamEnabled': True,
|
||||||
|
'StreamViewType': 'KEYS_ONLY'
|
||||||
|
}
|
||||||
|
assert 'LatestStreamLabel' in resp['TableDescription']
|
||||||
|
|
||||||
|
# now try to enable it again
|
||||||
|
with assert_raises(ValueError):
|
||||||
|
resp = conn.update_table(
|
||||||
|
TableName='test-streams',
|
||||||
|
StreamSpecification={
|
||||||
|
'StreamViewType': 'OLD_IMAGES'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_stream_with_range_key(self):
|
||||||
|
dyn = boto3.client('dynamodb', region_name='us-east-1')
|
||||||
|
|
||||||
|
resp = dyn.create_table(
|
||||||
|
TableName='test-streams',
|
||||||
|
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'},
|
||||||
|
{'AttributeName': 'color', 'KeyType': 'RANGE'}],
|
||||||
|
AttributeDefinitions=[{'AttributeName': 'id',
|
||||||
|
'AttributeType': 'S'},
|
||||||
|
{'AttributeName': 'color',
|
||||||
|
'AttributeType': 'S'}],
|
||||||
|
ProvisionedThroughput={'ReadCapacityUnits': 1,
|
||||||
|
'WriteCapacityUnits': 1},
|
||||||
|
StreamSpecification={
|
||||||
|
'StreamViewType': 'NEW_IMAGES'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
stream_arn = resp['TableDescription']['LatestStreamArn']
|
||||||
|
|
||||||
|
streams = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||||
|
resp = streams.describe_stream(StreamArn=stream_arn)
|
||||||
|
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
|
||||||
|
|
||||||
|
resp = streams.get_shard_iterator(
|
||||||
|
StreamArn=stream_arn,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType='LATEST'
|
||||||
|
)
|
||||||
|
iterator_id = resp['ShardIterator']
|
||||||
|
|
||||||
|
dyn.put_item(
|
||||||
|
TableName='test-streams',
|
||||||
|
Item={'id': {'S': 'row1'}, 'color': {'S': 'blue'}}
|
||||||
|
)
|
||||||
|
dyn.put_item(
|
||||||
|
TableName='test-streams',
|
||||||
|
Item={'id': {'S': 'row2'}, 'color': {'S': 'green'}}
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = streams.get_records(ShardIterator=iterator_id)
|
||||||
|
assert len(resp['Records']) == 2
|
||||||
|
assert resp['Records'][0]['eventName'] == 'INSERT'
|
||||||
|
assert resp['Records'][1]['eventName'] == 'INSERT'
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user