Merge pull request #2879 from bblommers/feature/dynamodb_transact_write_items
Feature: DynamoDB: transact_write_items
This commit is contained in:
commit
2b255b0c5b
@ -1209,9 +1209,9 @@ class DynamoDBBackend(BaseBackend):
|
||||
table_name,
|
||||
key,
|
||||
update_expression,
|
||||
attribute_updates,
|
||||
expression_attribute_names,
|
||||
expression_attribute_values,
|
||||
attribute_updates=None,
|
||||
expected=None,
|
||||
condition_expression=None,
|
||||
):
|
||||
@ -1332,6 +1332,94 @@ class DynamoDBBackend(BaseBackend):
|
||||
|
||||
return table.ttl
|
||||
|
||||
def transact_write_items(self, transact_items):
|
||||
# Create a backup in case any of the transactions fail
|
||||
original_table_state = copy.deepcopy(self.tables)
|
||||
try:
|
||||
for item in transact_items:
|
||||
if "ConditionCheck" in item:
|
||||
item = item["ConditionCheck"]
|
||||
key = item["Key"]
|
||||
table_name = item["TableName"]
|
||||
condition_expression = item.get("ConditionExpression", None)
|
||||
expression_attribute_names = item.get(
|
||||
"ExpressionAttributeNames", None
|
||||
)
|
||||
expression_attribute_values = item.get(
|
||||
"ExpressionAttributeValues", None
|
||||
)
|
||||
current = self.get_item(table_name, key)
|
||||
|
||||
condition_op = get_filter_expression(
|
||||
condition_expression,
|
||||
expression_attribute_names,
|
||||
expression_attribute_values,
|
||||
)
|
||||
if not condition_op.expr(current):
|
||||
raise ValueError("The conditional request failed")
|
||||
elif "Put" in item:
|
||||
item = item["Put"]
|
||||
attrs = item["Item"]
|
||||
table_name = item["TableName"]
|
||||
condition_expression = item.get("ConditionExpression", None)
|
||||
expression_attribute_names = item.get(
|
||||
"ExpressionAttributeNames", None
|
||||
)
|
||||
expression_attribute_values = item.get(
|
||||
"ExpressionAttributeValues", None
|
||||
)
|
||||
self.put_item(
|
||||
table_name,
|
||||
attrs,
|
||||
condition_expression=condition_expression,
|
||||
expression_attribute_names=expression_attribute_names,
|
||||
expression_attribute_values=expression_attribute_values,
|
||||
)
|
||||
elif "Delete" in item:
|
||||
item = item["Delete"]
|
||||
key = item["Key"]
|
||||
table_name = item["TableName"]
|
||||
condition_expression = item.get("ConditionExpression", None)
|
||||
expression_attribute_names = item.get(
|
||||
"ExpressionAttributeNames", None
|
||||
)
|
||||
expression_attribute_values = item.get(
|
||||
"ExpressionAttributeValues", None
|
||||
)
|
||||
self.delete_item(
|
||||
table_name,
|
||||
key,
|
||||
condition_expression=condition_expression,
|
||||
expression_attribute_names=expression_attribute_names,
|
||||
expression_attribute_values=expression_attribute_values,
|
||||
)
|
||||
elif "Update" in item:
|
||||
item = item["Update"]
|
||||
key = item["Key"]
|
||||
table_name = item["TableName"]
|
||||
update_expression = item["UpdateExpression"]
|
||||
condition_expression = item.get("ConditionExpression", None)
|
||||
expression_attribute_names = item.get(
|
||||
"ExpressionAttributeNames", None
|
||||
)
|
||||
expression_attribute_values = item.get(
|
||||
"ExpressionAttributeValues", None
|
||||
)
|
||||
self.update_item(
|
||||
table_name,
|
||||
key,
|
||||
update_expression=update_expression,
|
||||
condition_expression=condition_expression,
|
||||
expression_attribute_names=expression_attribute_names,
|
||||
expression_attribute_values=expression_attribute_values,
|
||||
)
|
||||
else:
|
||||
raise ValueError
|
||||
except: # noqa: E722 Do not use bare except
|
||||
# Rollback to the original state, and reraise the error
|
||||
self.tables = original_table_state
|
||||
raise
|
||||
|
||||
|
||||
dynamodb_backends = {}
|
||||
for region in Session().get_available_regions("dynamodb"):
|
||||
|
@ -762,12 +762,12 @@ class DynamoHandler(BaseResponse):
|
||||
item = self.dynamodb_backend.update_item(
|
||||
name,
|
||||
key,
|
||||
update_expression,
|
||||
attribute_updates,
|
||||
expression_attribute_names,
|
||||
expression_attribute_values,
|
||||
expected,
|
||||
condition_expression,
|
||||
update_expression=update_expression,
|
||||
attribute_updates=attribute_updates,
|
||||
expression_attribute_names=expression_attribute_names,
|
||||
expression_attribute_values=expression_attribute_values,
|
||||
expected=expected,
|
||||
condition_expression=condition_expression,
|
||||
)
|
||||
except MockValidationException as mve:
|
||||
er = "com.amazonaws.dynamodb.v20111205#ValidationException"
|
||||
@ -924,3 +924,15 @@ class DynamoHandler(BaseResponse):
|
||||
result.update({"ConsumedCapacity": [v for v in consumed_capacity.values()]})
|
||||
|
||||
return dynamo_json_dump(result)
|
||||
|
||||
def transact_write_items(self):
|
||||
transact_items = self.body["TransactItems"]
|
||||
try:
|
||||
self.dynamodb_backend.transact_write_items(transact_items)
|
||||
except ValueError:
|
||||
er = "com.amazonaws.dynamodb.v20111205#ConditionalCheckFailedException"
|
||||
return self.error(
|
||||
er, "A condition specified in the operation could not be evaluated."
|
||||
)
|
||||
response = {"ConsumedCapacity": [], "ItemCollectionMetrics": {}}
|
||||
return dynamo_json_dump(response)
|
||||
|
@ -4219,6 +4219,358 @@ def test_gsi_verify_negative_number_order():
|
||||
)
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_put():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Put multiple items
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"Put": {
|
||||
"Item": {"id": {"S": "foo{}".format(str(i))}, "foo": {"S": "bar"},},
|
||||
"TableName": "test-table",
|
||||
}
|
||||
}
|
||||
for i in range(0, 5)
|
||||
]
|
||||
)
|
||||
# Assert all are present
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(5)
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_put_conditional_expressions():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
dynamodb.put_item(
|
||||
TableName="test-table", Item={"id": {"S": "foo2"},},
|
||||
)
|
||||
# Put multiple items
|
||||
with assert_raises(ClientError) as ex:
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"Put": {
|
||||
"Item": {
|
||||
"id": {"S": "foo{}".format(str(i))},
|
||||
"foo": {"S": "bar"},
|
||||
},
|
||||
"TableName": "test-table",
|
||||
"ConditionExpression": "#i <> :i",
|
||||
"ExpressionAttributeNames": {"#i": "id"},
|
||||
"ExpressionAttributeValues": {
|
||||
":i": {
|
||||
"S": "foo2"
|
||||
} # This item already exist, so the ConditionExpression should fail
|
||||
},
|
||||
}
|
||||
}
|
||||
for i in range(0, 5)
|
||||
]
|
||||
)
|
||||
# Assert the exception is correct
|
||||
ex.exception.response["Error"]["Code"].should.equal(
|
||||
"ConditionalCheckFailedException"
|
||||
)
|
||||
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||
ex.exception.response["Error"]["Message"].should.equal(
|
||||
"A condition specified in the operation could not be evaluated."
|
||||
)
|
||||
# Assert all are present
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(1)
|
||||
items[0].should.equal({"id": {"S": "foo2"}})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_conditioncheck_passes():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Insert an item without email address
|
||||
dynamodb.put_item(
|
||||
TableName="test-table", Item={"id": {"S": "foo"},},
|
||||
)
|
||||
# Put an email address, after verifying it doesn't exist yet
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"ConditionCheck": {
|
||||
"Key": {"id": {"S": "foo"}},
|
||||
"TableName": "test-table",
|
||||
"ConditionExpression": "attribute_not_exists(#e)",
|
||||
"ExpressionAttributeNames": {"#e": "email_address"},
|
||||
}
|
||||
},
|
||||
{
|
||||
"Put": {
|
||||
"Item": {
|
||||
"id": {"S": "foo"},
|
||||
"email_address": {"S": "test@moto.com"},
|
||||
},
|
||||
"TableName": "test-table",
|
||||
}
|
||||
},
|
||||
]
|
||||
)
|
||||
# Assert all are present
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(1)
|
||||
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_conditioncheck_fails():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Insert an item with email address
|
||||
dynamodb.put_item(
|
||||
TableName="test-table",
|
||||
Item={"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}},
|
||||
)
|
||||
# Try to put an email address, but verify whether it exists
|
||||
# ConditionCheck should fail
|
||||
with assert_raises(ClientError) as ex:
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"ConditionCheck": {
|
||||
"Key": {"id": {"S": "foo"}},
|
||||
"TableName": "test-table",
|
||||
"ConditionExpression": "attribute_not_exists(#e)",
|
||||
"ExpressionAttributeNames": {"#e": "email_address"},
|
||||
}
|
||||
},
|
||||
{
|
||||
"Put": {
|
||||
"Item": {
|
||||
"id": {"S": "foo"},
|
||||
"email_address": {"S": "update@moto.com"},
|
||||
},
|
||||
"TableName": "test-table",
|
||||
}
|
||||
},
|
||||
]
|
||||
)
|
||||
# Assert the exception is correct
|
||||
ex.exception.response["Error"]["Code"].should.equal(
|
||||
"ConditionalCheckFailedException"
|
||||
)
|
||||
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||
ex.exception.response["Error"]["Message"].should.equal(
|
||||
"A condition specified in the operation could not be evaluated."
|
||||
)
|
||||
|
||||
# Assert the original email address is still present
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(1)
|
||||
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_delete():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Insert an item
|
||||
dynamodb.put_item(
|
||||
TableName="test-table", Item={"id": {"S": "foo"},},
|
||||
)
|
||||
# Delete the item
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{"Delete": {"Key": {"id": {"S": "foo"}}, "TableName": "test-table",}}
|
||||
]
|
||||
)
|
||||
# Assert the item is deleted
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_delete_with_successful_condition_expression():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Insert an item without email address
|
||||
dynamodb.put_item(
|
||||
TableName="test-table", Item={"id": {"S": "foo"},},
|
||||
)
|
||||
# ConditionExpression will pass - no email address has been specified yet
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"Delete": {
|
||||
"Key": {"id": {"S": "foo"},},
|
||||
"TableName": "test-table",
|
||||
"ConditionExpression": "attribute_not_exists(#e)",
|
||||
"ExpressionAttributeNames": {"#e": "email_address"},
|
||||
}
|
||||
}
|
||||
]
|
||||
)
|
||||
# Assert the item is deleted
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_delete_with_failed_condition_expression():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Insert an item with email address
|
||||
dynamodb.put_item(
|
||||
TableName="test-table",
|
||||
Item={"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}},
|
||||
)
|
||||
# Try to delete an item that does not have an email address
|
||||
# ConditionCheck should fail
|
||||
with assert_raises(ClientError) as ex:
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"Delete": {
|
||||
"Key": {"id": {"S": "foo"},},
|
||||
"TableName": "test-table",
|
||||
"ConditionExpression": "attribute_not_exists(#e)",
|
||||
"ExpressionAttributeNames": {"#e": "email_address"},
|
||||
}
|
||||
}
|
||||
]
|
||||
)
|
||||
# Assert the exception is correct
|
||||
ex.exception.response["Error"]["Code"].should.equal(
|
||||
"ConditionalCheckFailedException"
|
||||
)
|
||||
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||
ex.exception.response["Error"]["Message"].should.equal(
|
||||
"A condition specified in the operation could not be evaluated."
|
||||
)
|
||||
# Assert the original item is still present
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(1)
|
||||
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_update():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Insert an item
|
||||
dynamodb.put_item(TableName="test-table", Item={"id": {"S": "foo"}})
|
||||
# Update the item
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"Update": {
|
||||
"Key": {"id": {"S": "foo"}},
|
||||
"TableName": "test-table",
|
||||
"UpdateExpression": "SET #e = :v",
|
||||
"ExpressionAttributeNames": {"#e": "email_address"},
|
||||
"ExpressionAttributeValues": {":v": {"S": "test@moto.com"}},
|
||||
}
|
||||
}
|
||||
]
|
||||
)
|
||||
# Assert the item is updated
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(1)
|
||||
items[0].should.equal({"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_transact_write_items_update_with_failed_condition_expression():
|
||||
table_schema = {
|
||||
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
|
||||
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"},],
|
||||
}
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
dynamodb.create_table(
|
||||
TableName="test-table", BillingMode="PAY_PER_REQUEST", **table_schema
|
||||
)
|
||||
# Insert an item with email address
|
||||
dynamodb.put_item(
|
||||
TableName="test-table",
|
||||
Item={"id": {"S": "foo"}, "email_address": {"S": "test@moto.com"}},
|
||||
)
|
||||
# Try to update an item that does not have an email address
|
||||
# ConditionCheck should fail
|
||||
with assert_raises(ClientError) as ex:
|
||||
dynamodb.transact_write_items(
|
||||
TransactItems=[
|
||||
{
|
||||
"Update": {
|
||||
"Key": {"id": {"S": "foo"}},
|
||||
"TableName": "test-table",
|
||||
"UpdateExpression": "SET #e = :v",
|
||||
"ConditionExpression": "attribute_not_exists(#e)",
|
||||
"ExpressionAttributeNames": {"#e": "email_address"},
|
||||
"ExpressionAttributeValues": {":v": {"S": "update@moto.com"}},
|
||||
}
|
||||
}
|
||||
]
|
||||
)
|
||||
# Assert the exception is correct
|
||||
ex.exception.response["Error"]["Code"].should.equal(
|
||||
"ConditionalCheckFailedException"
|
||||
)
|
||||
ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||
ex.exception.response["Error"]["Message"].should.equal(
|
||||
"A condition specified in the operation could not be evaluated."
|
||||
)
|
||||
# Assert the original item is still present
|
||||
items = dynamodb.scan(TableName="test-table")["Items"]
|
||||
items.should.have.length_of(1)
|
||||
items[0].should.equal({"email_address": {"S": "test@moto.com"}, "id": {"S": "foo"}})
|
||||
|
||||
|
||||
@mock_dynamodb2
|
||||
def test_dynamodb_max_1mb_limit():
|
||||
ddb = boto3.resource("dynamodb", region_name="eu-west-1")
|
||||
|
Loading…
Reference in New Issue
Block a user