From 9d190aa04e4c66831449a1a5cd3673c35e995bf9 Mon Sep 17 00:00:00 2001 From: Karl Gutwin Date: Thu, 8 Nov 2018 13:22:24 -0500 Subject: [PATCH] Tweak functionality and add tests --- moto/dynamodb2/models.py | 17 ++-- .../test_dynamodbstreams.py | 99 ++++++++++++++++++- 2 files changed, 108 insertions(+), 8 deletions(-) diff --git a/moto/dynamodb2/models.py b/moto/dynamodb2/models.py index e2882f1e4..d15b9fce5 100644 --- a/moto/dynamodb2/models.py +++ b/moto/dynamodb2/models.py @@ -406,10 +406,13 @@ class Table(BaseModel): def set_stream_specification(self, streams): self.stream_specification = streams - if streams and streams.get('StreamEnabled'): + if streams and (streams.get('StreamEnabled') + or streams.get('StreamViewType')): + self.stream_specification['StreamEnabled'] = True self.latest_stream_label = datetime.datetime.utcnow().isoformat() self.stream_shard = StreamShard(self) else: + self.stream_specification = {'StreamEnabled': False} self.latest_stream_label = None self.stream_shard = None @@ -429,11 +432,11 @@ class Table(BaseModel): 'LocalSecondaryIndexes': [index for index in self.indexes], } } - if self.stream_specification: + if self.stream_specification and self.stream_specification['StreamEnabled']: results[base_key]['StreamSpecification'] = self.stream_specification - if self.latest_stream_label: - results[base_key]['LatestStreamLabel'] = self.latest_stream_label - results[base_key]['LatestStreamArn'] = self.table_arn + '/stream/' + self.latest_stream_label + if self.latest_stream_label: + results[base_key]['LatestStreamLabel'] = self.latest_stream_label + results[base_key]['LatestStreamArn'] = self.table_arn + '/stream/' + self.latest_stream_label return results def __len__(self): @@ -779,7 +782,9 @@ class DynamoDBBackend(BaseBackend): def update_table_streams(self, name, stream_specification): table = self.tables[name] - if stream_specification['StreamEnabled'] and table.latest_stream_label: + if ((stream_specification.get('StreamEnabled') + or stream_specification.get('StreamViewType')) + and table.latest_stream_label): raise ValueError('Table already has stream enabled') table.set_stream_specification(stream_specification) return table diff --git a/tests/test_dynamodbstreams/test_dynamodbstreams.py b/tests/test_dynamodbstreams/test_dynamodbstreams.py index 94c0c51b2..7e4025626 100644 --- a/tests/test_dynamodbstreams/test_dynamodbstreams.py +++ b/tests/test_dynamodbstreams/test_dynamodbstreams.py @@ -1,10 +1,12 @@ from __future__ import unicode_literals, print_function +from nose.tools import assert_raises + import boto3 from moto import mock_dynamodb2, mock_dynamodbstreams -class TestClass(): +class TestCore(): stream_arn = None mocks = [] @@ -84,7 +86,7 @@ class TestClass(): resp = conn.get_shard_iterator( StreamArn=self.stream_arn, ShardId=shard_id, - ShardIteratorType='TRIM_HORIZON' + ShardIteratorType='LATEST' ) iterator_id = resp['ShardIterator'] @@ -137,3 +139,96 @@ class TestClass(): # empty resp = conn.get_records(ShardIterator=resp['NextShardIterator']) assert len(resp['Records']) == 0 + + +class TestEdges(): + mocks = [] + + def setup(self): + self.mocks = [mock_dynamodb2(), mock_dynamodbstreams()] + for m in self.mocks: + m.start() + + def teardown(self): + for m in self.mocks: + m.stop() + + + def test_enable_stream_on_table(self): + conn = boto3.client('dynamodb', region_name='us-east-1') + resp = conn.create_table( + TableName='test-streams', + KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}], + AttributeDefinitions=[{'AttributeName': 'id', + 'AttributeType': 'S'}], + ProvisionedThroughput={'ReadCapacityUnits': 1, + 'WriteCapacityUnits': 1} + ) + assert 'StreamSpecification' not in resp['TableDescription'] + + resp = conn.update_table( + TableName='test-streams', + StreamSpecification={ + 'StreamViewType': 'KEYS_ONLY' + } + ) + assert 'StreamSpecification' in resp['TableDescription'] + assert resp['TableDescription']['StreamSpecification'] == { + 'StreamEnabled': True, + 'StreamViewType': 'KEYS_ONLY' + } + assert 'LatestStreamLabel' in resp['TableDescription'] + + # now try to enable it again + with assert_raises(ValueError): + resp = conn.update_table( + TableName='test-streams', + StreamSpecification={ + 'StreamViewType': 'OLD_IMAGES' + } + ) + + def test_stream_with_range_key(self): + dyn = boto3.client('dynamodb', region_name='us-east-1') + + resp = dyn.create_table( + TableName='test-streams', + KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}, + {'AttributeName': 'color', 'KeyType': 'RANGE'}], + AttributeDefinitions=[{'AttributeName': 'id', + 'AttributeType': 'S'}, + {'AttributeName': 'color', + 'AttributeType': 'S'}], + ProvisionedThroughput={'ReadCapacityUnits': 1, + 'WriteCapacityUnits': 1}, + StreamSpecification={ + 'StreamViewType': 'NEW_IMAGES' + } + ) + stream_arn = resp['TableDescription']['LatestStreamArn'] + + streams = boto3.client('dynamodbstreams', region_name='us-east-1') + resp = streams.describe_stream(StreamArn=stream_arn) + shard_id = resp['StreamDescription']['Shards'][0]['ShardId'] + + resp = streams.get_shard_iterator( + StreamArn=stream_arn, + ShardId=shard_id, + ShardIteratorType='LATEST' + ) + iterator_id = resp['ShardIterator'] + + dyn.put_item( + TableName='test-streams', + Item={'id': {'S': 'row1'}, 'color': {'S': 'blue'}} + ) + dyn.put_item( + TableName='test-streams', + Item={'id': {'S': 'row2'}, 'color': {'S': 'green'}} + ) + + resp = streams.get_records(ShardIterator=iterator_id) + assert len(resp['Records']) == 2 + assert resp['Records'][0]['eventName'] == 'INSERT' + assert resp['Records'][1]['eventName'] == 'INSERT' +