DynamoDB - Improve handling of ReturnConsumedCapacity-param (#4241)

This commit is contained in:
Bert Blommers 2021-08-29 19:04:42 +01:00 committed by GitHub
parent 58da62cc71
commit cc568c1656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 202 additions and 162 deletions

View File

@ -5,6 +5,7 @@ import json
import re
import itertools
from functools import wraps
from moto.core.responses import BaseResponse
from moto.core.utils import camelcase_to_underscores, amz_crc32, amzn_request_id
@ -19,6 +20,55 @@ from moto.dynamodb2.models import dynamodb_backends, dynamo_json_dump
TRANSACTION_MAX_ITEMS = 25
def include_consumed_capacity(val=1.0):
def _inner(f):
@wraps(f)
def _wrapper(*args, **kwargs):
(handler,) = args
expected_capacity = handler.body.get("ReturnConsumedCapacity", "NONE")
if expected_capacity not in ["NONE", "TOTAL", "INDEXES"]:
type_ = "ValidationException"
message = "1 validation error detected: Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: Member must satisfy enum value set: [INDEXES, TOTAL, NONE]".format(
expected_capacity
)
return (
400,
handler.response_headers,
dynamo_json_dump({"__type": type_, "message": message}),
)
table_name = handler.body.get("TableName", "")
index_name = handler.body.get("IndexName", None)
response = f(*args, **kwargs)
if isinstance(response, str):
body = json.loads(response)
if expected_capacity == "TOTAL":
body["ConsumedCapacity"] = {
"TableName": table_name,
"CapacityUnits": val,
}
elif expected_capacity == "INDEXES":
body["ConsumedCapacity"] = {
"TableName": table_name,
"CapacityUnits": val,
"Table": {"CapacityUnits": val},
}
if index_name:
body["ConsumedCapacity"]["LocalSecondaryIndexes"] = {
index_name: {"CapacityUnits": val}
}
return dynamo_json_dump(body)
return response
return _wrapper
return _inner
def put_has_empty_keys(field_updates, table):
if table:
key_names = table.key_attributes
@ -254,6 +304,7 @@ class DynamoHandler(BaseResponse):
er = "com.amazonaws.dynamodb.v20111205#ResourceNotFoundException"
return self.error(er, "Requested resource not found")
@include_consumed_capacity()
def put_item(self):
name = self.body["TableName"]
item = self.body["Item"]
@ -310,7 +361,6 @@ class DynamoHandler(BaseResponse):
if result:
item_dict = result.to_json()
item_dict["ConsumedCapacity"] = {"TableName": name, "CapacityUnits": 1}
if return_values == "ALL_OLD":
item_dict["Attributes"] = existing_attributes
else:
@ -349,6 +399,7 @@ class DynamoHandler(BaseResponse):
return dynamo_json_dump(response)
@include_consumed_capacity(0.5)
def get_item(self):
name = self.body["TableName"]
table = self.dynamodb_backend.get_table(name)
@ -372,11 +423,10 @@ class DynamoHandler(BaseResponse):
return self.error(er, "Validation Exception")
if item:
item_dict = item.describe_attrs(attributes=None)
item_dict["ConsumedCapacity"] = {"TableName": name, "CapacityUnits": 0.5}
return dynamo_json_dump(item_dict)
else:
# Item not found
return 200, self.response_headers, "{}"
return dynamo_json_dump({})
def batch_get_item(self):
table_batches = self.body["RequestItems"]
@ -441,6 +491,7 @@ class DynamoHandler(BaseResponse):
unique_keys.append(k)
return False
@include_consumed_capacity()
def query(self):
name = self.body["TableName"]
key_condition_expression = self.body.get("KeyConditionExpression")
@ -618,7 +669,6 @@ class DynamoHandler(BaseResponse):
result = {
"Count": len(items),
"ConsumedCapacity": {"TableName": name, "CapacityUnits": 1},
"ScannedCount": scanned_count,
}
@ -649,6 +699,7 @@ class DynamoHandler(BaseResponse):
return projection_expression
@include_consumed_capacity()
def scan(self):
name = self.body["TableName"]
@ -700,7 +751,6 @@ class DynamoHandler(BaseResponse):
result = {
"Count": len(items),
"Items": [item.attrs for item in items],
"ConsumedCapacity": {"TableName": name, "CapacityUnits": 1},
"ScannedCount": scanned_count,
}
if last_evaluated_key is not None:

View File

@ -589,42 +589,6 @@ def test_query_invalid_table():
assert exception.response["Error"]["Code"] == "ResourceNotFoundException"
@requires_boto_gte("2.9")
@mock_dynamodb2
def test_scan_returns_consumed_capacity():
name = "TestTable"
conn = boto3.client(
"dynamodb",
region_name="us-west-2",
aws_access_key_id="ak",
aws_secret_access_key="sk",
)
conn.create_table(
TableName=name,
KeySchema=[{"AttributeName": "forum_name", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "forum_name", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
conn.put_item(
TableName=name,
Item={
"forum_name": {"S": "LOLCat Forum"},
"subject": {"S": "Check this out!"},
"Body": {"S": "http://url_to_lolcat.gif"},
"SentBy": {"S": "test"},
"ReceivedTime": {"S": "12/9/2011 11:36:03 PM"},
},
)
response = conn.scan(TableName=name)
assert "ConsumedCapacity" in response
assert "CapacityUnits" in response["ConsumedCapacity"]
assert response["ConsumedCapacity"]["TableName"] == name
@requires_boto_gte("2.9")
@mock_dynamodb2
def test_put_item_with_special_chars():
@ -710,37 +674,6 @@ def test_put_item_with_streams():
stream_record["dynamodb"]["SizeBytes"].should.be.equal(447)
@requires_boto_gte("2.9")
@mock_dynamodb2
def test_query_returns_consumed_capacity():
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName="users",
KeySchema=[
{"AttributeName": "forum_name", "KeyType": "HASH"},
{"AttributeName": "subject", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "forum_name", "AttributeType": "S"},
{"AttributeName": "subject", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
table = dynamodb.Table("users")
table.put_item(
Item={"forum_name": "the-key", "subject": "123", "body": "some test message"}
)
results = table.query(KeyConditionExpression=Key("forum_name").eq("the-key"))
assert "ConsumedCapacity" in results
assert "CapacityUnits" in results["ConsumedCapacity"]
assert results["ConsumedCapacity"]["CapacityUnits"] == 1
@mock_dynamodb2
def test_basic_projection_expression_using_get_item():
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
@ -1435,96 +1368,6 @@ def test_nested_projection_expression_using_scan_with_attr_expression_names():
)
@mock_dynamodb2
def test_put_item_returns_consumed_capacity():
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName="users",
KeySchema=[
{"AttributeName": "forum_name", "KeyType": "HASH"},
{"AttributeName": "subject", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "forum_name", "AttributeType": "S"},
{"AttributeName": "subject", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
table = dynamodb.Table("users")
response = table.put_item(
Item={"forum_name": "the-key", "subject": "123", "body": "some test message"}
)
assert "ConsumedCapacity" in response
@mock_dynamodb2
def test_update_item_returns_consumed_capacity():
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName="users",
KeySchema=[
{"AttributeName": "forum_name", "KeyType": "HASH"},
{"AttributeName": "subject", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "forum_name", "AttributeType": "S"},
{"AttributeName": "subject", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
table = dynamodb.Table("users")
table.put_item(
Item={"forum_name": "the-key", "subject": "123", "body": "some test message"}
)
response = table.update_item(
Key={"forum_name": "the-key", "subject": "123"},
UpdateExpression="set body=:tb",
ExpressionAttributeValues={":tb": "a new message"},
)
assert "ConsumedCapacity" in response
assert "CapacityUnits" in response["ConsumedCapacity"]
assert "TableName" in response["ConsumedCapacity"]
@mock_dynamodb2
def test_get_item_returns_consumed_capacity():
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
# Create the DynamoDB table.
table = dynamodb.create_table(
TableName="users",
KeySchema=[
{"AttributeName": "forum_name", "KeyType": "HASH"},
{"AttributeName": "subject", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "forum_name", "AttributeType": "S"},
{"AttributeName": "subject", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
table = dynamodb.Table("users")
table.put_item(
Item={"forum_name": "the-key", "subject": "123", "body": "some test message"}
)
response = table.get_item(Key={"forum_name": "the-key", "subject": "123"})
assert "ConsumedCapacity" in response
assert "CapacityUnits" in response["ConsumedCapacity"]
assert "TableName" in response["ConsumedCapacity"]
@mock_dynamodb2
def test_put_empty_item():
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")

View File

@ -0,0 +1,147 @@
import boto3
import pytest
import sure # noqa
from botocore.exceptions import ClientError
from moto import mock_dynamodb2
@mock_dynamodb2
def test_error_on_wrong_value_for_consumed_capacity():
resource = boto3.resource("dynamodb", region_name="ap-northeast-3")
client = boto3.client("dynamodb", region_name="ap-northeast-3")
client.create_table(
TableName="jobs",
KeySchema=[{"AttributeName": "job_id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "job_id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
table = resource.Table("jobs")
item = {"job_id": "asdasdasd", "expires_at": "1"}
# PUT_ITEM
with pytest.raises(ClientError) as ex:
table.put_item(Item=item, ReturnConsumedCapacity="Garbage")
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.equal(
"1 validation error detected: Value 'Garbage' at 'returnConsumedCapacity' failed to satisfy constraint: Member must satisfy enum value set: [INDEXES, TOTAL, NONE]"
)
@mock_dynamodb2
def test_consumed_capacity_get_unknown_item():
conn = boto3.client("dynamodb", region_name="us-east-1")
conn.create_table(
TableName="test_table",
KeySchema=[{"AttributeName": "u", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "u", "AttributeType": "S"}],
)
response = conn.get_item(
TableName="test_table",
Key={"u": {"S": "does_not_exist"}},
ReturnConsumedCapacity="TOTAL",
)
# Should still return ConsumedCapacity, even if it does not return an item
response.should.have.key("ConsumedCapacity")
response["ConsumedCapacity"].should.equal(
{"TableName": "test_table", "CapacityUnits": 0.5}
)
@mock_dynamodb2
@pytest.mark.parametrize(
"capacity,should_have_capacity,should_have_table",
[
[None, False, False],
["NONE", False, False],
["TOTAL", True, False],
["INDEXES", True, True],
],
)
def test_only_return_consumed_capacity_when_required(
capacity, should_have_capacity, should_have_table
):
resource = boto3.resource("dynamodb", region_name="ap-northeast-3")
client = boto3.client("dynamodb", region_name="ap-northeast-3")
client.create_table(
TableName="jobs",
KeySchema=[{"AttributeName": "job_id", "KeyType": "HASH"}],
LocalSecondaryIndexes=[
{
"IndexName": "job_name-index",
"KeySchema": [{"AttributeName": "job_name", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
AttributeDefinitions=[
{"AttributeName": "job_id", "AttributeType": "S"},
{"AttributeName": "job_name", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
table = resource.Table("jobs")
item = {"job_id": "asdasdasd", "expires_at": "1"}
# PUT_ITEM
args = {"Item": item}
if capacity:
args["ReturnConsumedCapacity"] = capacity
response = table.put_item(**args)
validate_response(response, should_have_capacity, should_have_table)
# GET_ITEM
args = {"Key": item}
if capacity:
args["ReturnConsumedCapacity"] = capacity
response = table.get_item(**args)
validate_response(response, should_have_capacity, should_have_table, value=0.5)
# SCAN
args = {"TableName": "jobs"}
if capacity:
args["ReturnConsumedCapacity"] = capacity
response = client.scan(**args)
validate_response(response, should_have_capacity, should_have_table)
# SCAN_INDEX
args["IndexName"] = "job_name-index"
response = client.scan(**args)
validate_response(response, should_have_capacity, should_have_table, is_index=True)
# QUERY
args = {
"TableName": "jobs",
"KeyConditionExpression": "job_id = :id",
"ExpressionAttributeValues": {":id": {"S": "asdasdasd"}},
}
if capacity:
args["ReturnConsumedCapacity"] = capacity
response = client.query(**args)
validate_response(response, should_have_capacity, should_have_table)
# QUERY_INDEX
args["IndexName"] = "job_name-index"
response = client.query(**args)
validate_response(response, should_have_capacity, should_have_table, is_index=True)
def validate_response(
response, should_have_capacity, should_have_table, is_index=False, value=1.0
):
if should_have_capacity:
response.should.have.key("ConsumedCapacity")
response["ConsumedCapacity"]["TableName"].should.equal("jobs")
response["ConsumedCapacity"]["CapacityUnits"].should.equal(value)
if should_have_table:
response["ConsumedCapacity"]["Table"].should.equal({"CapacityUnits": value})
if is_index:
response["ConsumedCapacity"].should.have.key("LocalSecondaryIndexes")
response["ConsumedCapacity"]["LocalSecondaryIndexes"].should.equal(
{"job_name-index": {"CapacityUnits": value}}
)
else:
response.shouldnt.have.key("ConsumedCapacity")