diff --git a/moto/dynamodb2/models.py b/moto/dynamodb2/models.py index a54c4f7d0..16e97ea2f 100644 --- a/moto/dynamodb2/models.py +++ b/moto/dynamodb2/models.py @@ -294,7 +294,7 @@ class Item(BaseModel): class Table(BaseModel): - def __init__(self, table_name, schema=None, attr=None, throughput=None, indexes=None, global_indexes=None): + def __init__(self, table_name, schema=None, attr=None, throughput=None, indexes=None, global_indexes=None, streams=None): self.name = table_name self.attr = attr self.schema = schema @@ -325,10 +325,18 @@ class Table(BaseModel): 'TimeToLiveStatus': 'DISABLED' # One of 'ENABLING'|'DISABLING'|'ENABLED'|'DISABLED', # 'AttributeName': 'string' # Can contain this } + self.set_stream_specification(streams) def _generate_arn(self, name): return 'arn:aws:dynamodb:us-east-1:123456789011:table/' + name + def set_stream_specification(self, streams): + self.stream_specification = streams + if streams and streams.get('StreamEnabled'): + self.latest_stream_label = datetime.datetime.utcnow().isoformat() + else: + self.latest_stream_label = None + def describe(self, base_key='TableDescription'): results = { base_key: { @@ -345,6 +353,11 @@ class Table(BaseModel): 'LocalSecondaryIndexes': [index for index in self.indexes], } } + if self.stream_specification: + results[base_key]['StreamSpecification'] = self.stream_specification + if self.latest_stream_label: + results[base_key]['LatestStreamLabel'] = self.latest_stream_label + results[base_key]['LatestStreamArn'] = self.table_arn + '/stream/' + self.latest_stream_label return results def __len__(self): @@ -680,6 +693,13 @@ class DynamoDBBackend(BaseBackend): table.throughput = throughput return table + def update_table_streams(self, name, stream_specification): + table = self.tables[name] + if stream_specification['StreamEnabled'] and table.latest_stream_label: + raise ValueError('Table already has stream enabled') + table.set_stream_specification(stream_specification) + return table + def update_table_global_indexes(self, name, global_index_updates): table = self.tables[name] gsis_by_name = dict((i['IndexName'], i) for i in table.global_indexes) diff --git a/moto/dynamodb2/responses.py b/moto/dynamodb2/responses.py index e2f1ef1cc..73bd3ae38 100644 --- a/moto/dynamodb2/responses.py +++ b/moto/dynamodb2/responses.py @@ -104,13 +104,16 @@ class DynamoHandler(BaseResponse): # getting the indexes global_indexes = body.get("GlobalSecondaryIndexes", []) local_secondary_indexes = body.get("LocalSecondaryIndexes", []) + # get the stream specification + streams = body.get("StreamSpecification") table = self.dynamodb_backend.create_table(table_name, schema=key_schema, throughput=throughput, attr=attr, global_indexes=global_indexes, - indexes=local_secondary_indexes) + indexes=local_secondary_indexes, + streams=streams) if table is not None: return dynamo_json_dump(table.describe()) else: @@ -163,12 +166,16 @@ class DynamoHandler(BaseResponse): def update_table(self): name = self.body['TableName'] + table = self.dynamodb_backend.get_table(name) if 'GlobalSecondaryIndexUpdates' in self.body: table = self.dynamodb_backend.update_table_global_indexes( name, self.body['GlobalSecondaryIndexUpdates']) if 'ProvisionedThroughput' in self.body: throughput = self.body["ProvisionedThroughput"] table = self.dynamodb_backend.update_table_throughput(name, throughput) + if 'StreamSpecification' in self.body: + table = self.dynamodb_backend.update_table_streams(name, self.body['StreamSpecification']) + return dynamo_json_dump(table.describe()) def describe_table(self): diff --git a/tests/test_dynamodb2/test_dynamodb.py b/tests/test_dynamodb2/test_dynamodb.py index afc919dd7..7f30bbccf 100644 --- a/tests/test_dynamodb2/test_dynamodb.py +++ b/tests/test_dynamodb2/test_dynamodb.py @@ -1336,3 +1336,62 @@ def test_query_global_secondary_index_when_created_via_update_table_resource(): assert len(forum_and_subject_items) == 1 assert forum_and_subject_items[0] == {'user_id': Decimal('1'), 'forum_name': 'cats', 'subject': 'my pet is the cutest'} + + +@mock_dynamodb2 +def test_dynamodb_streams_1(): + conn = boto3.client('dynamodb', region_name='us-east-1') + + resp = conn.create_table( + TableName='test-streams', + KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}], + AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}], + ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1}, + StreamSpecification={ + 'StreamEnabled': True, + 'StreamViewType': 'NEW_AND_OLD_IMAGES' + } + ) + + assert 'StreamSpecification' in resp['TableDescription'] + assert resp['TableDescription']['StreamSpecification'] == { + 'StreamEnabled': True, + 'StreamViewType': 'NEW_AND_OLD_IMAGES' + } + assert 'LatestStreamLabel' in resp['TableDescription'] + assert 'LatestStreamArn' in resp['TableDescription'] + + resp = conn.delete_table(TableName='test-streams') + + assert 'StreamSpecification' in resp['TableDescription'] + + +@mock_dynamodb2 +def test_dynamodb_streams_2(): + conn = boto3.client('dynamodb', region_name='us-east-1') + + resp = conn.create_table( + TableName='test-stream-update', + KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}], + AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}], + ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1}, + ) + + assert 'StreamSpecification' not in resp['TableDescription'] + + resp = conn.update_table( + TableName='test-stream-update', + StreamSpecification={ + 'StreamEnabled': True, + 'StreamViewType': 'NEW_IMAGE' + } + ) + + assert 'StreamSpecification' in resp['TableDescription'] + assert resp['TableDescription']['StreamSpecification'] == { + 'StreamEnabled': True, + 'StreamViewType': 'NEW_IMAGE' + } + assert 'LatestStreamLabel' in resp['TableDescription'] + assert 'LatestStreamArn' in resp['TableDescription'] +