Merge pull request #486 from pcraciunoiu/feature/dynamodb-query-index

Add query index support for boto3 via dynamodb2
This commit is contained in:
Steve Pulec 2016-01-08 16:41:55 -05:00
commit d6d6ff56af
4 changed files with 541 additions and 15 deletions

View File

@ -64,7 +64,7 @@ It gets even better! Moto isn't just S3. Here's the status of the other AWS serv
| Data Pipeline | @mock_datapipeline| basic endpoints done |
|------------------------------------------------------------------------------|
| DynamoDB | @mock_dynamodb | core endpoints done |
| DynamoDB2 | @mock_dynamodb2 | core endpoints done - no indexes |
| DynamoDB2 | @mock_dynamodb2 | core endpoints + partial indexes |
|------------------------------------------------------------------------------|
| EC2 | @mock_ec2 | core endpoints done |
| - AMI | | core endpoints done |

View File

@ -278,20 +278,63 @@ class Table(object):
except KeyError:
return None
def query(self, hash_key, range_comparison, range_objs):
def query(self, hash_key, range_comparison, range_objs, index_name=None):
results = []
last_page = True # Once pagination is implemented, change this
possible_results = [item for item in list(self.all_items()) if isinstance(item, Item) and item.hash_key == hash_key]
if index_name:
all_indexes = (self.global_indexes or []) + (self.indexes or [])
indexes_by_name = dict((i['IndexName'], i) for i in all_indexes)
if index_name not in indexes_by_name:
raise ValueError('Invalid index: %s for table: %s. Available indexes are: %s' % (
index_name, self.name, ', '.join(indexes_by_name.keys())
))
index = indexes_by_name[index_name]
try:
index_hash_key = [key for key in index['KeySchema'] if key['KeyType'] == 'HASH'][0]
except IndexError:
raise ValueError('Missing Hash Key. KeySchema: %s' % index['KeySchema'])
possible_results = []
for item in self.all_items():
if not isinstance(item, Item):
continue
item_hash_key = item.attrs.get(index_hash_key['AttributeName'])
if item_hash_key and item_hash_key == hash_key:
possible_results.append(item)
else:
possible_results = [item for item in list(self.all_items()) if isinstance(item, Item) and item.hash_key == hash_key]
if index_name:
try:
index_range_key = [key for key in index['KeySchema'] if key['KeyType'] == 'RANGE'][0]
except IndexError:
index_range_key = None
if range_comparison:
for result in possible_results:
if result.range_key.compare(range_comparison, range_objs):
results.append(result)
if index_name and not index_range_key:
raise ValueError('Range Key comparison but no range key found for index: %s' % index_name)
elif index_name:
for result in possible_results:
if result.attrs.get(index_range_key['AttributeName']).compare(range_comparison, range_objs):
results.append(result)
else:
for result in possible_results:
if result.range_key.compare(range_comparison, range_objs):
results.append(result)
else:
# If we're not filtering on range key, return all values
results = possible_results
results.sort(key=lambda item: item.range_key)
if index_name:
if index_range_key:
results.sort(key=lambda item: item.attrs[index_range_key['AttributeName']].value
if item.attrs.get(index_range_key['AttributeName']) else None)
else:
results.sort(key=lambda item: item.range_key)
return results, last_page
def all_items(self):
@ -361,6 +404,38 @@ class DynamoDBBackend(BaseBackend):
table.throughput = throughput
return table
def update_table_global_indexes(self, name, global_index_updates):
table = self.tables[name]
gsis_by_name = dict((i['IndexName'], i) for i in table.global_indexes)
for gsi_update in global_index_updates:
gsi_to_create = gsi_update.get('Create')
gsi_to_update = gsi_update.get('Update')
gsi_to_delete = gsi_update.get('Delete')
if gsi_to_delete:
index_name = gsi_to_delete['IndexName']
if index_name not in gsis_by_name:
raise ValueError('Global Secondary Index does not exist, but tried to delete: %s' %
gsi_to_delete['IndexName'])
del gsis_by_name[index_name]
if gsi_to_update:
index_name = gsi_to_update['IndexName']
if index_name not in gsis_by_name:
raise ValueError('Global Secondary Index does not exist, but tried to update: %s' %
gsi_to_update['IndexName'])
gsis_by_name[index_name].update(gsi_to_update)
if gsi_to_create:
if gsi_to_create['IndexName'] in gsis_by_name:
raise ValueError('Global Secondary Index already exists: %s' % gsi_to_create['IndexName'])
gsis_by_name[gsi_to_create['IndexName']] = gsi_to_create
table.global_indexes = gsis_by_name.values()
return table
def put_item(self, table_name, item_attrs, expected=None, overwrite=False):
table = self.tables.get(table_name)
if not table:
@ -400,7 +475,7 @@ class DynamoDBBackend(BaseBackend):
hash_key, range_key = self.get_keys_value(table, keys)
return table.get_item(hash_key, range_key)
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts):
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts, index_name=None):
table = self.tables.get(table_name)
if not table:
return None, None
@ -408,7 +483,7 @@ class DynamoDBBackend(BaseBackend):
hash_key = DynamoType(hash_key_dict)
range_values = [DynamoType(range_value) for range_value in range_value_dicts]
return table.query(hash_key, range_comparison, range_values)
return table.query(hash_key, range_comparison, range_values, index_name)
def scan(self, table_name, filters):
table = self.tables.get(table_name)

View File

@ -123,8 +123,11 @@ class DynamoHandler(BaseResponse):
def update_table(self):
name = self.body['TableName']
throughput = self.body["ProvisionedThroughput"]
table = dynamodb_backend2.update_table_throughput(name, throughput)
if 'GlobalSecondaryIndexUpdates' in self.body:
table = dynamodb_backend2.update_table_global_indexes(name, self.body['GlobalSecondaryIndexUpdates'])
if 'ProvisionedThroughput' in self.body:
throughput = self.body["ProvisionedThroughput"]
table = dynamodb_backend2.update_table_throughput(name, throughput)
return dynamo_json_dump(table.describe)
def describe_table(self):
@ -239,11 +242,31 @@ class DynamoHandler(BaseResponse):
if key_condition_expression:
value_alias_map = self.body['ExpressionAttributeValues']
table = dynamodb_backend2.get_table(name)
index_name = self.body.get('IndexName')
if index_name:
all_indexes = (table.global_indexes or []) + (table.indexes or [])
indexes_by_name = dict((i['IndexName'], i) for i in all_indexes)
if index_name not in indexes_by_name:
raise ValueError('Invalid index: %s for table: %s. Available indexes are: %s' % (
index_name, name, ', '.join(indexes_by_name.keys())
))
index = indexes_by_name[index_name]['KeySchema']
else:
index = table.schema
key_map = [column for _, column in sorted((k, v) for k, v in self.body['ExpressionAttributeNames'].items())]
if " AND " in key_condition_expression:
expressions = key_condition_expression.split(" AND ", 1)
hash_key_expression = expressions[0]
index_hash_key = [key for key in index if key['KeyType'] == 'HASH'][0]
hash_key_index_in_key_map = key_map.index(index_hash_key['AttributeName'])
hash_key_expression = expressions.pop(hash_key_index_in_key_map).strip('()')
# TODO implement more than one range expression and OR operators
range_key_expression = expressions[1].replace(")", "")
range_key_expression = expressions[0].strip('()')
range_key_expression_components = range_key_expression.split()
range_comparison = range_key_expression_components[1]
if 'AND' in range_key_expression:
@ -291,7 +314,8 @@ class DynamoHandler(BaseResponse):
range_comparison = None
range_values = []
items, last_page = dynamodb_backend2.query(name, hash_key, range_comparison, range_values)
index_name = self.body.get('IndexName')
items, last_page = dynamodb_backend2.query(name, hash_key, range_comparison, range_values, index_name=index_name)
if items is None:
er = 'com.amazonaws.dynamodb.v20111205#ResourceNotFoundException'
return self.error(er)

View File

@ -1,5 +1,7 @@
from __future__ import unicode_literals
from decimal import Decimal
import boto
import boto3
from boto3.dynamodb.conditions import Key
@ -693,7 +695,7 @@ def test_boto3_conditions():
results['Count'].should.equal(1)
results = table.query(
KeyConditionExpression=Key('forum_name').eq('the-key') & Key("subject").begins_with('7')
KeyConditionExpression=Key("subject").begins_with('7') & Key('forum_name').eq('the-key')
)
results['Count'].should.equal(1)
@ -701,3 +703,428 @@ def test_boto3_conditions():
KeyConditionExpression=Key('forum_name').eq('the-key') & Key("subject").between('567', '890')
)
results['Count'].should.equal(1)
@mock_dynamodb2
def test_boto3_query_gsi_range_comparison():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName='users',
KeySchema=[
{
'AttributeName': 'forum_name',
'KeyType': 'HASH'
},
{
'AttributeName': 'subject',
'KeyType': 'RANGE'
},
],
GlobalSecondaryIndexes=[{
'IndexName': 'TestGSI',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH',
},
{
'AttributeName': 'created',
'KeyType': 'RANGE',
}
],
'Projection': {
'ProjectionType': 'ALL',
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
}],
AttributeDefinitions=[
{
'AttributeName': 'forum_name',
'AttributeType': 'S'
},
{
'AttributeName': 'subject',
'AttributeType': 'S'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
table = dynamodb.Table('users')
table.put_item(Item={
'forum_name': 'the-key',
'subject': '123',
'username': 'johndoe',
'created': 3,
})
table.put_item(Item={
'forum_name': 'the-key',
'subject': '456',
'username': 'johndoe',
'created': 1,
})
table.put_item(Item={
'forum_name': 'the-key',
'subject': '789',
'username': 'johndoe',
'created': 2,
})
table.put_item(Item={
'forum_name': 'the-key',
'subject': '159',
'username': 'janedoe',
'created': 2,
})
table.put_item(Item={
'forum_name': 'the-key',
'subject': '601',
'username': 'janedoe',
'created': 5,
})
# Test a query returning all johndoe items
results = table.query(
KeyConditionExpression=Key('username').eq('johndoe') & Key("created").gt('0'),
ScanIndexForward=True,
IndexName='TestGSI',
)
expected = ["456", "789", "123"]
for index, item in enumerate(results['Items']):
item["subject"].should.equal(expected[index])
# Return all johndoe items again, but in reverse
results = table.query(
KeyConditionExpression=Key('username').eq('johndoe') & Key("created").gt('0'),
ScanIndexForward=False,
IndexName='TestGSI',
)
for index, item in enumerate(reversed(results['Items'])):
item["subject"].should.equal(expected[index])
# Filter the creation to only return some of the results
# And reverse order of hash + range key
results = table.query(
KeyConditionExpression=Key("created").gt('1') & Key('username').eq('johndoe'),
ConsistentRead=True,
IndexName='TestGSI',
)
results['Count'].should.equal(2)
# Filter to return no results
results = table.query(
KeyConditionExpression=Key('username').eq('janedoe') & Key("created").gt('9'),
IndexName='TestGSI',
)
results['Count'].should.equal(0)
results = table.query(
KeyConditionExpression=Key('username').eq('janedoe') & Key("created").eq('5'),
IndexName='TestGSI',
)
results['Count'].should.equal(1)
# Test range key sorting
results = table.query(
KeyConditionExpression=Key('username').eq('johndoe') & Key("created").gt('0'),
IndexName='TestGSI',
)
expected = [Decimal('1'), Decimal('2'), Decimal('3')]
for index, item in enumerate(results['Items']):
item["created"].should.equal(expected[index])
@mock_dynamodb2
def test_update_table_throughput():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName='users',
KeySchema=[
{
'AttributeName': 'forum_name',
'KeyType': 'HASH'
},
{
'AttributeName': 'subject',
'KeyType': 'RANGE'
},
],
AttributeDefinitions=[
{
'AttributeName': 'forum_name',
'AttributeType': 'S'
},
{
'AttributeName': 'subject',
'AttributeType': 'S'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 6
}
)
table = dynamodb.Table('users')
table.provisioned_throughput['ReadCapacityUnits'].should.equal(5)
table.provisioned_throughput['WriteCapacityUnits'].should.equal(6)
table.update(ProvisionedThroughput={
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 11,
})
table = dynamodb.Table('users')
table.provisioned_throughput['ReadCapacityUnits'].should.equal(10)
table.provisioned_throughput['WriteCapacityUnits'].should.equal(11)
@mock_dynamodb2
def test_update_table_gsi_throughput():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName='users',
KeySchema=[
{
'AttributeName': 'forum_name',
'KeyType': 'HASH'
},
{
'AttributeName': 'subject',
'KeyType': 'RANGE'
},
],
GlobalSecondaryIndexes=[{
'IndexName': 'TestGSI',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH',
},
{
'AttributeName': 'created',
'KeyType': 'RANGE',
}
],
'Projection': {
'ProjectionType': 'ALL',
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 3,
'WriteCapacityUnits': 4
}
}],
AttributeDefinitions=[
{
'AttributeName': 'forum_name',
'AttributeType': 'S'
},
{
'AttributeName': 'subject',
'AttributeType': 'S'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 6
}
)
table = dynamodb.Table('users')
gsi_throughput = table.global_secondary_indexes[0]['ProvisionedThroughput']
gsi_throughput['ReadCapacityUnits'].should.equal(3)
gsi_throughput['WriteCapacityUnits'].should.equal(4)
table.provisioned_throughput['ReadCapacityUnits'].should.equal(5)
table.provisioned_throughput['WriteCapacityUnits'].should.equal(6)
table.update(GlobalSecondaryIndexUpdates=[{
'Update': {
'IndexName': 'TestGSI',
'ProvisionedThroughput': {
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 11,
}
},
}])
table = dynamodb.Table('users')
# Primary throughput has not changed
table.provisioned_throughput['ReadCapacityUnits'].should.equal(5)
table.provisioned_throughput['WriteCapacityUnits'].should.equal(6)
gsi_throughput = table.global_secondary_indexes[0]['ProvisionedThroughput']
gsi_throughput['ReadCapacityUnits'].should.equal(10)
gsi_throughput['WriteCapacityUnits'].should.equal(11)
@mock_dynamodb2
def test_update_table_gsi_create():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName='users',
KeySchema=[
{
'AttributeName': 'forum_name',
'KeyType': 'HASH'
},
{
'AttributeName': 'subject',
'KeyType': 'RANGE'
},
],
AttributeDefinitions=[
{
'AttributeName': 'forum_name',
'AttributeType': 'S'
},
{
'AttributeName': 'subject',
'AttributeType': 'S'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 6
}
)
table = dynamodb.Table('users')
table.global_secondary_indexes.should.have.length_of(0)
table.update(GlobalSecondaryIndexUpdates=[{
'Create': {
'IndexName': 'TestGSI',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH',
},
{
'AttributeName': 'created',
'KeyType': 'RANGE',
}
],
'Projection': {
'ProjectionType': 'ALL',
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 3,
'WriteCapacityUnits': 4
}
},
}])
table = dynamodb.Table('users')
table.global_secondary_indexes.should.have.length_of(1)
gsi_throughput = table.global_secondary_indexes[0]['ProvisionedThroughput']
assert gsi_throughput['ReadCapacityUnits'].should.equal(3)
assert gsi_throughput['WriteCapacityUnits'].should.equal(4)
# Check update works
table.update(GlobalSecondaryIndexUpdates=[{
'Update': {
'IndexName': 'TestGSI',
'ProvisionedThroughput': {
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 11,
}
},
}])
table = dynamodb.Table('users')
gsi_throughput = table.global_secondary_indexes[0]['ProvisionedThroughput']
assert gsi_throughput['ReadCapacityUnits'].should.equal(10)
assert gsi_throughput['WriteCapacityUnits'].should.equal(11)
table.update(GlobalSecondaryIndexUpdates=[{
'Delete': {
'IndexName': 'TestGSI',
},
}])
table = dynamodb.Table('users')
table.global_secondary_indexes.should.have.length_of(0)
@mock_dynamodb2
def test_update_table_gsi_throughput():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName='users',
KeySchema=[
{
'AttributeName': 'forum_name',
'KeyType': 'HASH'
},
{
'AttributeName': 'subject',
'KeyType': 'RANGE'
},
],
GlobalSecondaryIndexes=[{
'IndexName': 'TestGSI',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH',
},
{
'AttributeName': 'created',
'KeyType': 'RANGE',
}
],
'Projection': {
'ProjectionType': 'ALL',
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 3,
'WriteCapacityUnits': 4
}
}],
AttributeDefinitions=[
{
'AttributeName': 'forum_name',
'AttributeType': 'S'
},
{
'AttributeName': 'subject',
'AttributeType': 'S'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 6
}
)
table = dynamodb.Table('users')
table.global_secondary_indexes.should.have.length_of(1)
table.update(GlobalSecondaryIndexUpdates=[{
'Delete': {
'IndexName': 'TestGSI',
},
}])
table = dynamodb.Table('users')
table.global_secondary_indexes.should.have.length_of(0)