Merge pull request #3071 from bblommers/dynamodb_gsi_add_throughput

DynamoDB - Add default GSI throughput
This commit is contained in:
Steve Pulec 2020-06-13 18:59:33 -05:00 committed by GitHub
commit 1f2e6b8925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 166 additions and 25 deletions

View File

@ -272,6 +272,66 @@ class StreamShard(BaseModel):
return [i.to_json() for i in self.items[start:end]]
class LocalSecondaryIndex(BaseModel):
def __init__(self, index_name, schema, projection):
self.name = index_name
self.schema = schema
self.projection = projection
def describe(self):
return {
"IndexName": self.name,
"KeySchema": self.schema,
"Projection": self.projection,
}
@staticmethod
def create(dct):
return LocalSecondaryIndex(
index_name=dct["IndexName"],
schema=dct["KeySchema"],
projection=dct["Projection"],
)
class GlobalSecondaryIndex(BaseModel):
def __init__(
self, index_name, schema, projection, status="ACTIVE", throughput=None
):
self.name = index_name
self.schema = schema
self.projection = projection
self.status = status
self.throughput = throughput or {
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0,
}
def describe(self):
return {
"IndexName": self.name,
"KeySchema": self.schema,
"Projection": self.projection,
"IndexStatus": self.status,
"ProvisionedThroughput": self.throughput,
}
@staticmethod
def create(dct):
return GlobalSecondaryIndex(
index_name=dct["IndexName"],
schema=dct["KeySchema"],
projection=dct["Projection"],
throughput=dct.get("ProvisionedThroughput", None),
)
def update(self, u):
self.name = u.get("IndexName", self.name)
self.schema = u.get("KeySchema", self.schema)
self.projection = u.get("Projection", self.projection)
self.throughput = u.get("ProvisionedThroughput", self.throughput)
class Table(BaseModel):
def __init__(
self,
@ -302,12 +362,13 @@ class Table(BaseModel):
else:
self.throughput = throughput
self.throughput["NumberOfDecreasesToday"] = 0
self.indexes = indexes
self.global_indexes = global_indexes if global_indexes else []
for index in self.global_indexes:
index[
"IndexStatus"
] = "ACTIVE" # One of 'CREATING'|'UPDATING'|'DELETING'|'ACTIVE'
self.indexes = [
LocalSecondaryIndex.create(i) for i in (indexes if indexes else [])
]
self.global_indexes = [
GlobalSecondaryIndex.create(i)
for i in (global_indexes if global_indexes else [])
]
self.created_at = datetime.datetime.utcnow()
self.items = defaultdict(dict)
self.table_arn = self._generate_arn(table_name)
@ -376,8 +437,10 @@ class Table(BaseModel):
"KeySchema": self.schema,
"ItemCount": len(self),
"CreationDateTime": unix_time(self.created_at),
"GlobalSecondaryIndexes": [index for index in self.global_indexes],
"LocalSecondaryIndexes": [index for index in self.indexes],
"GlobalSecondaryIndexes": [
index.describe() for index in self.global_indexes
],
"LocalSecondaryIndexes": [index.describe() for index in self.indexes],
}
}
if self.stream_specification and self.stream_specification["StreamEnabled"]:
@ -403,7 +466,7 @@ class Table(BaseModel):
keys = [self.hash_key_attr]
for index in self.global_indexes:
hash_key = None
for key in index["KeySchema"]:
for key in index.schema:
if key["KeyType"] == "HASH":
hash_key = key["AttributeName"]
keys.append(hash_key)
@ -414,7 +477,7 @@ class Table(BaseModel):
keys = [self.range_key_attr]
for index in self.global_indexes:
range_key = None
for key in index["KeySchema"]:
for key in index.schema:
if key["KeyType"] == "RANGE":
range_key = keys.append(key["AttributeName"])
keys.append(range_key)
@ -547,7 +610,7 @@ class Table(BaseModel):
if index_name:
all_indexes = self.all_indexes()
indexes_by_name = dict((i["IndexName"], i) for i in all_indexes)
indexes_by_name = dict((i.name, i) for i in all_indexes)
if index_name not in indexes_by_name:
raise ValueError(
"Invalid index: %s for table: %s. Available indexes are: %s"
@ -557,14 +620,14 @@ class Table(BaseModel):
index = indexes_by_name[index_name]
try:
index_hash_key = [
key for key in index["KeySchema"] if key["KeyType"] == "HASH"
key for key in index.schema if key["KeyType"] == "HASH"
][0]
except IndexError:
raise ValueError("Missing Hash Key. KeySchema: %s" % index["KeySchema"])
raise ValueError("Missing Hash Key. KeySchema: %s" % index.name)
try:
index_range_key = [
key for key in index["KeySchema"] if key["KeyType"] == "RANGE"
key for key in index.schema if key["KeyType"] == "RANGE"
][0]
except IndexError:
index_range_key = None
@ -669,9 +732,9 @@ class Table(BaseModel):
def has_idx_items(self, index_name):
all_indexes = self.all_indexes()
indexes_by_name = dict((i["IndexName"], i) for i in all_indexes)
indexes_by_name = dict((i.name, i) for i in all_indexes)
idx = indexes_by_name[index_name]
idx_col_set = set([i["AttributeName"] for i in idx["KeySchema"]])
idx_col_set = set([i["AttributeName"] for i in idx.schema])
for hash_set in self.items.values():
if self.range_key_attr:
@ -694,7 +757,7 @@ class Table(BaseModel):
results = []
scanned_count = 0
all_indexes = self.all_indexes()
indexes_by_name = dict((i["IndexName"], i) for i in all_indexes)
indexes_by_name = dict((i.name, i) for i in all_indexes)
if index_name:
if index_name not in indexes_by_name:
@ -775,9 +838,9 @@ class Table(BaseModel):
if scanned_index:
all_indexes = self.all_indexes()
indexes_by_name = dict((i["IndexName"], i) for i in all_indexes)
indexes_by_name = dict((i.name, i) for i in all_indexes)
idx = indexes_by_name[scanned_index]
idx_col_list = [i["AttributeName"] for i in idx["KeySchema"]]
idx_col_list = [i["AttributeName"] for i in idx.schema]
for col in idx_col_list:
last_evaluated_key[col] = results[-1].attrs[col]
@ -887,7 +950,7 @@ class DynamoDBBackend(BaseBackend):
def update_table_global_indexes(self, name, global_index_updates):
table = self.tables[name]
gsis_by_name = dict((i["IndexName"], i) for i in table.global_indexes)
gsis_by_name = dict((i.name, i) for i in table.global_indexes)
for gsi_update in global_index_updates:
gsi_to_create = gsi_update.get("Create")
gsi_to_update = gsi_update.get("Update")
@ -908,7 +971,7 @@ class DynamoDBBackend(BaseBackend):
if index_name not in gsis_by_name:
raise ValueError(
"Global Secondary Index does not exist, but tried to update: %s"
% gsi_to_update["IndexName"]
% index_name
)
gsis_by_name[index_name].update(gsi_to_update)
@ -919,7 +982,9 @@ class DynamoDBBackend(BaseBackend):
% gsi_to_create["IndexName"]
)
gsis_by_name[gsi_to_create["IndexName"]] = gsi_to_create
gsis_by_name[gsi_to_create["IndexName"]] = GlobalSecondaryIndex.create(
gsi_to_create
)
# in python 3.6, dict.values() returns a dict_values object, but we expect it to be a list in other
# parts of the codebase

View File

@ -431,7 +431,6 @@ class DynamoHandler(BaseResponse):
def query(self):
name = self.body["TableName"]
# {u'KeyConditionExpression': u'#n0 = :v0', u'ExpressionAttributeValues': {u':v0': {u'S': u'johndoe'}}, u'ExpressionAttributeNames': {u'#n0': u'username'}}
key_condition_expression = self.body.get("KeyConditionExpression")
projection_expression = self.body.get("ProjectionExpression")
expression_attribute_names = self.body.get("ExpressionAttributeNames", {})
@ -459,7 +458,7 @@ class DynamoHandler(BaseResponse):
index_name = self.body.get("IndexName")
if index_name:
all_indexes = (table.global_indexes or []) + (table.indexes or [])
indexes_by_name = dict((i["IndexName"], i) for i in all_indexes)
indexes_by_name = dict((i.name, i) for i in all_indexes)
if index_name not in indexes_by_name:
er = "com.amazonaws.dynamodb.v20120810#ResourceNotFoundException"
return self.error(
@ -469,7 +468,7 @@ class DynamoHandler(BaseResponse):
),
)
index = indexes_by_name[index_name]["KeySchema"]
index = indexes_by_name[index_name].schema
else:
index = table.schema

View File

@ -931,6 +931,83 @@ boto3
"""
@mock_dynamodb2
def test_boto3_create_table_with_gsi():
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
table = dynamodb.create_table(
TableName="users",
KeySchema=[
{"AttributeName": "forum_name", "KeyType": "HASH"},
{"AttributeName": "subject", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "forum_name", "AttributeType": "S"},
{"AttributeName": "subject", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
GlobalSecondaryIndexes=[
{
"IndexName": "test_gsi",
"KeySchema": [{"AttributeName": "subject", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
)
table["TableDescription"]["GlobalSecondaryIndexes"].should.equal(
[
{
"KeySchema": [{"KeyType": "HASH", "AttributeName": "subject"}],
"IndexName": "test_gsi",
"Projection": {"ProjectionType": "ALL"},
"IndexStatus": "ACTIVE",
"ProvisionedThroughput": {
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0,
},
}
]
)
table = dynamodb.create_table(
TableName="users2",
KeySchema=[
{"AttributeName": "forum_name", "KeyType": "HASH"},
{"AttributeName": "subject", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "forum_name", "AttributeType": "S"},
{"AttributeName": "subject", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
GlobalSecondaryIndexes=[
{
"IndexName": "test_gsi",
"KeySchema": [{"AttributeName": "subject", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
"ProvisionedThroughput": {
"ReadCapacityUnits": 3,
"WriteCapacityUnits": 5,
},
}
],
)
table["TableDescription"]["GlobalSecondaryIndexes"].should.equal(
[
{
"KeySchema": [{"KeyType": "HASH", "AttributeName": "subject"}],
"IndexName": "test_gsi",
"Projection": {"ProjectionType": "ALL"},
"IndexStatus": "ACTIVE",
"ProvisionedThroughput": {
"ReadCapacityUnits": 3,
"WriteCapacityUnits": 5,
},
}
]
)
@mock_dynamodb2
def test_boto3_conditions():
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")