Add StreamSpecification to dynamodb2 package

This commit is contained in:
Karl Gutwin 2018-11-07 15:03:25 -05:00
parent 90a62b5640
commit 0b57ffe26a
3 changed files with 88 additions and 2 deletions

View File

@ -294,7 +294,7 @@ class Item(BaseModel):
class Table(BaseModel): class Table(BaseModel):
def __init__(self, table_name, schema=None, attr=None, throughput=None, indexes=None, global_indexes=None): def __init__(self, table_name, schema=None, attr=None, throughput=None, indexes=None, global_indexes=None, streams=None):
self.name = table_name self.name = table_name
self.attr = attr self.attr = attr
self.schema = schema self.schema = schema
@ -325,10 +325,18 @@ class Table(BaseModel):
'TimeToLiveStatus': 'DISABLED' # One of 'ENABLING'|'DISABLING'|'ENABLED'|'DISABLED', 'TimeToLiveStatus': 'DISABLED' # One of 'ENABLING'|'DISABLING'|'ENABLED'|'DISABLED',
# 'AttributeName': 'string' # Can contain this # 'AttributeName': 'string' # Can contain this
} }
self.set_stream_specification(streams)
def _generate_arn(self, name): def _generate_arn(self, name):
return 'arn:aws:dynamodb:us-east-1:123456789011:table/' + name return 'arn:aws:dynamodb:us-east-1:123456789011:table/' + name
def set_stream_specification(self, streams):
self.stream_specification = streams
if streams and streams.get('StreamEnabled'):
self.latest_stream_label = datetime.datetime.utcnow().isoformat()
else:
self.latest_stream_label = None
def describe(self, base_key='TableDescription'): def describe(self, base_key='TableDescription'):
results = { results = {
base_key: { base_key: {
@ -345,6 +353,11 @@ class Table(BaseModel):
'LocalSecondaryIndexes': [index for index in self.indexes], 'LocalSecondaryIndexes': [index for index in self.indexes],
} }
} }
if self.stream_specification:
results[base_key]['StreamSpecification'] = self.stream_specification
if self.latest_stream_label:
results[base_key]['LatestStreamLabel'] = self.latest_stream_label
results[base_key]['LatestStreamArn'] = self.table_arn + '/stream/' + self.latest_stream_label
return results return results
def __len__(self): def __len__(self):
@ -680,6 +693,13 @@ class DynamoDBBackend(BaseBackend):
table.throughput = throughput table.throughput = throughput
return table return table
def update_table_streams(self, name, stream_specification):
table = self.tables[name]
if stream_specification['StreamEnabled'] and table.latest_stream_label:
raise ValueError('Table already has stream enabled')
table.set_stream_specification(stream_specification)
return table
def update_table_global_indexes(self, name, global_index_updates): def update_table_global_indexes(self, name, global_index_updates):
table = self.tables[name] table = self.tables[name]
gsis_by_name = dict((i['IndexName'], i) for i in table.global_indexes) gsis_by_name = dict((i['IndexName'], i) for i in table.global_indexes)

View File

@ -104,13 +104,16 @@ class DynamoHandler(BaseResponse):
# getting the indexes # getting the indexes
global_indexes = body.get("GlobalSecondaryIndexes", []) global_indexes = body.get("GlobalSecondaryIndexes", [])
local_secondary_indexes = body.get("LocalSecondaryIndexes", []) local_secondary_indexes = body.get("LocalSecondaryIndexes", [])
# get the stream specification
streams = body.get("StreamSpecification")
table = self.dynamodb_backend.create_table(table_name, table = self.dynamodb_backend.create_table(table_name,
schema=key_schema, schema=key_schema,
throughput=throughput, throughput=throughput,
attr=attr, attr=attr,
global_indexes=global_indexes, global_indexes=global_indexes,
indexes=local_secondary_indexes) indexes=local_secondary_indexes,
streams=streams)
if table is not None: if table is not None:
return dynamo_json_dump(table.describe()) return dynamo_json_dump(table.describe())
else: else:
@ -163,12 +166,16 @@ class DynamoHandler(BaseResponse):
def update_table(self): def update_table(self):
name = self.body['TableName'] name = self.body['TableName']
table = self.dynamodb_backend.get_table(name)
if 'GlobalSecondaryIndexUpdates' in self.body: if 'GlobalSecondaryIndexUpdates' in self.body:
table = self.dynamodb_backend.update_table_global_indexes( table = self.dynamodb_backend.update_table_global_indexes(
name, self.body['GlobalSecondaryIndexUpdates']) name, self.body['GlobalSecondaryIndexUpdates'])
if 'ProvisionedThroughput' in self.body: if 'ProvisionedThroughput' in self.body:
throughput = self.body["ProvisionedThroughput"] throughput = self.body["ProvisionedThroughput"]
table = self.dynamodb_backend.update_table_throughput(name, throughput) table = self.dynamodb_backend.update_table_throughput(name, throughput)
if 'StreamSpecification' in self.body:
table = self.dynamodb_backend.update_table_streams(name, self.body['StreamSpecification'])
return dynamo_json_dump(table.describe()) return dynamo_json_dump(table.describe())
def describe_table(self): def describe_table(self):

View File

@ -1336,3 +1336,62 @@ def test_query_global_secondary_index_when_created_via_update_table_resource():
assert len(forum_and_subject_items) == 1 assert len(forum_and_subject_items) == 1
assert forum_and_subject_items[0] == {'user_id': Decimal('1'), 'forum_name': 'cats', assert forum_and_subject_items[0] == {'user_id': Decimal('1'), 'forum_name': 'cats',
'subject': 'my pet is the cutest'} 'subject': 'my pet is the cutest'}
@mock_dynamodb2
def test_dynamodb_streams_1():
conn = boto3.client('dynamodb', region_name='us-east-1')
resp = conn.create_table(
TableName='test-streams',
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1},
StreamSpecification={
'StreamEnabled': True,
'StreamViewType': 'NEW_AND_OLD_IMAGES'
}
)
assert 'StreamSpecification' in resp['TableDescription']
assert resp['TableDescription']['StreamSpecification'] == {
'StreamEnabled': True,
'StreamViewType': 'NEW_AND_OLD_IMAGES'
}
assert 'LatestStreamLabel' in resp['TableDescription']
assert 'LatestStreamArn' in resp['TableDescription']
resp = conn.delete_table(TableName='test-streams')
assert 'StreamSpecification' in resp['TableDescription']
@mock_dynamodb2
def test_dynamodb_streams_2():
conn = boto3.client('dynamodb', region_name='us-east-1')
resp = conn.create_table(
TableName='test-stream-update',
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
ProvisionedThroughput={'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1},
)
assert 'StreamSpecification' not in resp['TableDescription']
resp = conn.update_table(
TableName='test-stream-update',
StreamSpecification={
'StreamEnabled': True,
'StreamViewType': 'NEW_IMAGE'
}
)
assert 'StreamSpecification' in resp['TableDescription']
assert resp['TableDescription']['StreamSpecification'] == {
'StreamEnabled': True,
'StreamViewType': 'NEW_IMAGE'
}
assert 'LatestStreamLabel' in resp['TableDescription']
assert 'LatestStreamArn' in resp['TableDescription']