add dynamodb pagination
This commit is contained in:
parent
9b0be24b28
commit
6bab725b36
@ -308,9 +308,9 @@ class Table(object):
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
def query(self, hash_key, range_comparison, range_objs, index_name=None):
|
||||
def query(self, hash_key, range_comparison, range_objs, limit,
|
||||
exclusive_start_key, scan_index_forward, index_name=None):
|
||||
results = []
|
||||
last_page = True # Once pagination is implemented, change this
|
||||
|
||||
if index_name:
|
||||
all_indexes = (self.global_indexes or []) + (self.indexes or [])
|
||||
@ -365,7 +365,11 @@ class Table(object):
|
||||
if item.attrs.get(index_range_key['AttributeName']) else None)
|
||||
else:
|
||||
results.sort(key=lambda item: item.range_key)
|
||||
return results, last_page
|
||||
|
||||
if scan_index_forward is False:
|
||||
results.reverse()
|
||||
|
||||
return self._trim_results(results, limit, exclusive_start_key)
|
||||
|
||||
def all_items(self):
|
||||
for hash_set in self.items.values():
|
||||
@ -375,10 +379,9 @@ class Table(object):
|
||||
else:
|
||||
yield hash_set
|
||||
|
||||
def scan(self, filters):
|
||||
def scan(self, filters, limit, exclusive_start_key):
|
||||
results = []
|
||||
scanned_count = 0
|
||||
last_page = True # Once pagination is implemented, change this
|
||||
|
||||
for result in self.all_items():
|
||||
scanned_count += 1
|
||||
@ -401,7 +404,33 @@ class Table(object):
|
||||
|
||||
if passes_all_conditions:
|
||||
results.append(result)
|
||||
return results, scanned_count, last_page
|
||||
|
||||
results, last_evaluated_key = self._trim_results(results, limit,
|
||||
exclusive_start_key)
|
||||
return results, scanned_count, last_evaluated_key
|
||||
|
||||
def _trim_results(self, results, limit, exclusive_start_key):
|
||||
if exclusive_start_key is not None:
|
||||
hash_key = DynamoType(exclusive_start_key.get(self.hash_key_attr))
|
||||
range_key = exclusive_start_key.get(self.range_key_attr)
|
||||
if range_key is not None:
|
||||
range_key = DynamoType(range_key)
|
||||
for i in range(len(results)):
|
||||
if results[i].hash_key == hash_key and results[i].range_key == range_key:
|
||||
results = results[i + 1:]
|
||||
break
|
||||
|
||||
last_evaluated_key = None
|
||||
if limit and len(results) > limit:
|
||||
results = results[:limit]
|
||||
last_evaluated_key = {
|
||||
self.hash_key_attr: results[-1].hash_key
|
||||
}
|
||||
if results[-1].range_key is not None:
|
||||
last_evaluated_key[self.range_key_attr] = results[-1].range_key
|
||||
|
||||
return results, last_evaluated_key
|
||||
|
||||
|
||||
def lookup(self, *args, **kwargs):
|
||||
if not self.schema:
|
||||
@ -507,7 +536,8 @@ class DynamoDBBackend(BaseBackend):
|
||||
hash_key, range_key = self.get_keys_value(table, keys)
|
||||
return table.get_item(hash_key, range_key)
|
||||
|
||||
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts, index_name=None):
|
||||
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts,
|
||||
limit, exclusive_start_key, scan_index_forward, index_name=None):
|
||||
table = self.tables.get(table_name)
|
||||
if not table:
|
||||
return None, None
|
||||
@ -515,9 +545,10 @@ class DynamoDBBackend(BaseBackend):
|
||||
hash_key = DynamoType(hash_key_dict)
|
||||
range_values = [DynamoType(range_value) for range_value in range_value_dicts]
|
||||
|
||||
return table.query(hash_key, range_comparison, range_values, index_name)
|
||||
return table.query(hash_key, range_comparison, range_values, limit,
|
||||
exclusive_start_key, scan_index_forward, index_name)
|
||||
|
||||
def scan(self, table_name, filters):
|
||||
def scan(self, table_name, filters, limit, exclusive_start_key):
|
||||
table = self.tables.get(table_name)
|
||||
if not table:
|
||||
return None, None, None
|
||||
@ -527,7 +558,7 @@ class DynamoDBBackend(BaseBackend):
|
||||
dynamo_types = [DynamoType(value) for value in comparison_values]
|
||||
scan_filters[key] = (comparison_operator, dynamo_types)
|
||||
|
||||
return table.scan(scan_filters)
|
||||
return table.scan(scan_filters, limit, exclusive_start_key)
|
||||
|
||||
def update_item(self, table_name, key, update_expression, attribute_updates):
|
||||
table = self.get_table(table_name)
|
||||
|
@ -238,7 +238,6 @@ class DynamoHandler(BaseResponse):
|
||||
|
||||
def query(self):
|
||||
name = self.body['TableName']
|
||||
|
||||
# {u'KeyConditionExpression': u'#n0 = :v0', u'ExpressionAttributeValues': {u':v0': {u'S': u'johndoe'}}, u'ExpressionAttributeNames': {u'#n0': u'username'}}
|
||||
key_condition_expression = self.body.get('KeyConditionExpression')
|
||||
if key_condition_expression:
|
||||
@ -317,19 +316,16 @@ class DynamoHandler(BaseResponse):
|
||||
range_values = []
|
||||
|
||||
index_name = self.body.get('IndexName')
|
||||
items, last_page = dynamodb_backend2.query(name, hash_key, range_comparison, range_values, index_name=index_name)
|
||||
exclusive_start_key = self.body.get('ExclusiveStartKey')
|
||||
limit = self.body.get("Limit")
|
||||
scan_index_forward = self.body.get("ScanIndexForward")
|
||||
items, last_evaluated_key = dynamodb_backend2.query(
|
||||
name, hash_key, range_comparison, range_values, limit,
|
||||
exclusive_start_key, scan_index_forward, index_name=index_name)
|
||||
if items is None:
|
||||
er = 'com.amazonaws.dynamodb.v20111205#ResourceNotFoundException'
|
||||
return self.error(er)
|
||||
|
||||
reversed = self.body.get("ScanIndexForward")
|
||||
if reversed is False:
|
||||
items.reverse()
|
||||
|
||||
limit = self.body.get("Limit")
|
||||
if limit:
|
||||
items = items[:limit]
|
||||
|
||||
result = {
|
||||
"Count": len(items),
|
||||
"ConsumedCapacityUnits": 1,
|
||||
@ -337,12 +333,9 @@ class DynamoHandler(BaseResponse):
|
||||
if self.body.get('Select', '').upper() != 'COUNT':
|
||||
result["Items"] = [item.attrs for item in items]
|
||||
|
||||
# Implement this when we do pagination
|
||||
# if not last_page:
|
||||
# result["LastEvaluatedKey"] = {
|
||||
# "HashKeyElement": items[-1].hash_key,
|
||||
# "RangeKeyElement": items[-1].range_key,
|
||||
# }
|
||||
if last_evaluated_key is not None:
|
||||
result["LastEvaluatedKey"] = last_evaluated_key
|
||||
|
||||
return dynamo_json_dump(result)
|
||||
|
||||
def scan(self):
|
||||
@ -356,29 +349,25 @@ class DynamoHandler(BaseResponse):
|
||||
comparison_values = scan_filter.get("AttributeValueList", [])
|
||||
filters[attribute_name] = (comparison_operator, comparison_values)
|
||||
|
||||
items, scanned_count, last_page = dynamodb_backend2.scan(name, filters)
|
||||
exclusive_start_key = self.body.get('ExclusiveStartKey')
|
||||
limit = self.body.get("Limit")
|
||||
|
||||
items, scanned_count, last_evaluated_key = dynamodb_backend2.scan(name, filters,
|
||||
limit,
|
||||
exclusive_start_key)
|
||||
|
||||
if items is None:
|
||||
er = 'com.amazonaws.dynamodb.v20111205#ResourceNotFoundException'
|
||||
return self.error(er)
|
||||
|
||||
limit = self.body.get("Limit")
|
||||
if limit:
|
||||
items = items[:limit]
|
||||
|
||||
result = {
|
||||
"Count": len(items),
|
||||
"Items": [item.attrs for item in items],
|
||||
"ConsumedCapacityUnits": 1,
|
||||
"ScannedCount": scanned_count
|
||||
}
|
||||
|
||||
# Implement this when we do pagination
|
||||
# if not last_page:
|
||||
# result["LastEvaluatedKey"] = {
|
||||
# "HashKeyElement": items[-1].hash_key,
|
||||
# "RangeKeyElement": items[-1].range_key,
|
||||
# }
|
||||
if last_evaluated_key is not None:
|
||||
result["LastEvaluatedKey"] = last_evaluated_key
|
||||
return dynamo_json_dump(result)
|
||||
|
||||
def delete_item(self):
|
||||
|
@ -941,7 +941,6 @@ def test_update_item_range_key_set():
|
||||
})
|
||||
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_update_item_does_not_exist_is_created():
|
||||
table = _create_table_with_range_key()
|
||||
@ -1405,3 +1404,36 @@ def test_update_table_gsi_throughput():
|
||||
|
||||
table = dynamodb.Table('users')
|
||||
table.global_secondary_indexes.should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_query_pagination():
|
||||
table = _create_table_with_range_key()
|
||||
for i in range(10):
|
||||
table.put_item(Item={
|
||||
'forum_name': 'the-key',
|
||||
'subject': '{0}'.format(i),
|
||||
'username': 'johndoe',
|
||||
'created': Decimal('3'),
|
||||
})
|
||||
|
||||
page1 = table.query(
|
||||
KeyConditionExpression=Key('forum_name').eq('the-key'),
|
||||
Limit=6
|
||||
)
|
||||
page1['Count'].should.equal(6)
|
||||
page1['Items'].should.have.length_of(6)
|
||||
page1.should.have.key('LastEvaluatedKey')
|
||||
|
||||
page2 = table.query(
|
||||
KeyConditionExpression=Key('forum_name').eq('the-key'),
|
||||
Limit=6,
|
||||
ExclusiveStartKey=page1['LastEvaluatedKey']
|
||||
)
|
||||
page2['Count'].should.equal(4)
|
||||
page2['Items'].should.have.length_of(4)
|
||||
page2.should_not.have.key('LastEvaluatedKey')
|
||||
|
||||
results = page1['Items'] + page2['Items']
|
||||
subjects = set([int(r['subject']) for r in results])
|
||||
subjects.should.equal(set(range(10)))
|
||||
|
@ -520,11 +520,9 @@ boto3
|
||||
"""
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_boto3_conditions():
|
||||
def _create_user_table():
|
||||
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
|
||||
|
||||
# Create the DynamoDB table.
|
||||
table = dynamodb.create_table(
|
||||
TableName='users',
|
||||
KeySchema=[
|
||||
@ -544,7 +542,12 @@ def test_boto3_conditions():
|
||||
'WriteCapacityUnits': 5
|
||||
}
|
||||
)
|
||||
table = dynamodb.Table('users')
|
||||
return dynamodb.Table('users')
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_boto3_conditions():
|
||||
table = _create_user_table()
|
||||
|
||||
table.put_item(Item={'username': 'johndoe'})
|
||||
table.put_item(Item={'username': 'janedoe'})
|
||||
@ -555,3 +558,27 @@ def test_boto3_conditions():
|
||||
response['Count'].should.equal(1)
|
||||
response['Items'].should.have.length_of(1)
|
||||
response['Items'][0].should.equal({"username": "johndoe"})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_scan_pagination():
|
||||
table = _create_user_table()
|
||||
|
||||
expected_usernames = ['user{0}'.format(i) for i in range(10)]
|
||||
for u in expected_usernames:
|
||||
table.put_item(Item={'username': u})
|
||||
|
||||
page1 = table.scan(Limit=6)
|
||||
page1['Count'].should.equal(6)
|
||||
page1['Items'].should.have.length_of(6)
|
||||
page1.should.have.key('LastEvaluatedKey')
|
||||
|
||||
page2 = table.scan(Limit=6,
|
||||
ExclusiveStartKey=page1['LastEvaluatedKey'])
|
||||
page2['Count'].should.equal(4)
|
||||
page2['Items'].should.have.length_of(4)
|
||||
page2.should_not.have.key('LastEvaluatedKey')
|
||||
|
||||
results = page1['Items'] + page2['Items']
|
||||
usernames = set([r['username'] for r in results])
|
||||
usernames.should.equal(set(expected_usernames))
|
||||
|
Loading…
Reference in New Issue
Block a user