DynamoDB: Initial support for execute_statement et. al. (#6216)

This commit is contained in:
Bert Blommers 2023-04-16 19:06:29 +00:00 committed by GitHub
parent af841303d0
commit 822d94f59e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 345 additions and 16 deletions

View File

@ -1678,9 +1678,9 @@
## dynamodb ## dynamodb
<details> <details>
<summary>54% implemented</summary> <summary>60% implemented</summary>
- [ ] batch_execute_statement - [X] batch_execute_statement
- [X] batch_get_item - [X] batch_get_item
- [X] batch_write_item - [X] batch_write_item
- [X] create_backup - [X] create_backup
@ -1704,8 +1704,8 @@
- [X] describe_time_to_live - [X] describe_time_to_live
- [ ] disable_kinesis_streaming_destination - [ ] disable_kinesis_streaming_destination
- [ ] enable_kinesis_streaming_destination - [ ] enable_kinesis_streaming_destination
- [ ] execute_statement - [X] execute_statement
- [ ] execute_transaction - [X] execute_transaction
- [ ] export_table_to_point_in_time - [ ] export_table_to_point_in_time
- [X] get_item - [X] get_item
- [ ] import_table - [ ] import_table

View File

@ -25,7 +25,11 @@ dynamodb
|start-h3| Implemented features for this service |end-h3| |start-h3| Implemented features for this service |end-h3|
- [ ] batch_execute_statement - [X] batch_execute_statement
Please see the documentation for `execute_statement` to see the limitations of what is supported.
- [X] batch_get_item - [X] batch_get_item
- [X] batch_write_item - [X] batch_write_item
- [X] create_backup - [X] create_backup
@ -49,8 +53,20 @@ dynamodb
- [X] describe_time_to_live - [X] describe_time_to_live
- [ ] disable_kinesis_streaming_destination - [ ] disable_kinesis_streaming_destination
- [ ] enable_kinesis_streaming_destination - [ ] enable_kinesis_streaming_destination
- [ ] execute_statement - [X] execute_statement
- [ ] execute_transaction
Only SELECT-statements are supported for now.
Pagination is not yet implemented.
Parsing of the statement is highly experimental - please raise an issue if you find any bugs.
- [X] execute_transaction
Please see the documentation for `execute_statement` to see the limitations of what is supported.
- [ ] export_table_to_point_in_time - [ ] export_table_to_point_in_time
- [X] get_item - [X] get_item
- [ ] import_table - [ ] import_table

View File

@ -1,4 +1,5 @@
import copy import copy
import json
import re import re
from collections import OrderedDict from collections import OrderedDict
@ -25,6 +26,7 @@ from moto.dynamodb.exceptions import (
TransactWriteSingleOpException, TransactWriteSingleOpException,
) )
from moto.dynamodb.models.dynamo_type import DynamoType, Item from moto.dynamodb.models.dynamo_type import DynamoType, Item
from moto.dynamodb.models.dynamo_type import serializer, deserializer
from moto.dynamodb.models.table import ( from moto.dynamodb.models.table import (
Table, Table,
RestoredTable, RestoredTable,
@ -35,6 +37,7 @@ from moto.dynamodb.models.table import (
from moto.dynamodb.parsing.executors import UpdateExpressionExecutor from moto.dynamodb.parsing.executors import UpdateExpressionExecutor
from moto.dynamodb.parsing.expressions import UpdateExpressionParser # type: ignore from moto.dynamodb.parsing.expressions import UpdateExpressionParser # type: ignore
from moto.dynamodb.parsing.validators import UpdateExpressionValidator from moto.dynamodb.parsing.validators import UpdateExpressionValidator
from moto.dynamodb.parsing import partiql
class DynamoDBBackend(BaseBackend): class DynamoDBBackend(BaseBackend):
@ -770,5 +773,89 @@ class DynamoDBBackend(BaseBackend):
def transact_get_items(self) -> None: def transact_get_items(self) -> None:
pass pass
def execute_statement(
self, statement: str, parameters: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Only SELECT-statements are supported for now.
Pagination is not yet implemented.
Parsing is highly experimental - please raise an issue if you find any bugs.
"""
# We need to execute a statement - but we don't know which table
# Just pass all tables to PartiQL
source_data: Dict[str, str] = dict()
for table in self.tables.values():
source_data[table.name] = "\n".join(
[json.dumps(item.to_regular_json()) for item in table.all_items()]
)
# Parameters are in DynamoDB JSON form ({"S": "value"}) - we only want the value itself
parameters = [deserializer.deserialize(param) for param in parameters]
regular_json = partiql.query(statement, source_data, parameters)
return [
{key: serializer.serialize(value) for key, value in item.items()}
for item in regular_json
]
def execute_transaction(
self, statements: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Please see the documentation for `execute_statement` to see the limitations of what is supported.
"""
responses = []
for stmt in statements:
items = self.execute_statement(
statement=stmt["Statement"], parameters=stmt.get("Parameters", [])
)
responses.extend([{"Item": item} for item in items])
return responses
def batch_execute_statement(
self, statements: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Please see the documentation for `execute_statement` to see the limitations of what is supported.
"""
responses = []
# Validation
for stmt in statements:
metadata = partiql.get_query_metadata(stmt["Statement"])
table_name = metadata.get_table_names()[0]
response = {}
filter_keys = metadata.get_filter_names()
if table_name not in self.tables:
response["Error"] = {
"Code": "ResourceNotFound",
"Message": "Requested resource not found",
}
else:
response["TableName"] = table_name
table = self.tables[table_name]
for required_attr in table.table_key_attrs:
if required_attr not in filter_keys:
response["Error"] = {
"Code": "ValidationError",
"Message": "Select statements within BatchExecuteStatement must specify the primary key in the where clause.",
}
responses.append(response)
# Execution
for idx, stmt in enumerate(statements):
if "Error" in responses[idx]:
continue
items = self.execute_statement(
statement=stmt["Statement"], parameters=stmt.get("Parameters", [])
)
# Statements should always contain a HashKey and SortKey
# An item with those keys may not exist
if items:
# But if it does, it will always only contain one item at most
responses[idx]["Item"] = items[0]
return responses
dynamodb_backends = BackendDict(DynamoDBBackend, "dynamodb") dynamodb_backends = BackendDict(DynamoDBBackend, "dynamodb")

View File

@ -1,4 +1,5 @@
import decimal import decimal
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from typing import Any, Dict, List, Union, Optional from typing import Any, Dict, List, Union, Optional
from moto.core import BaseModel from moto.core import BaseModel
@ -9,6 +10,9 @@ from moto.dynamodb.exceptions import (
) )
from moto.dynamodb.models.utilities import bytesize from moto.dynamodb.models.utilities import bytesize
deserializer = TypeDeserializer()
serializer = TypeSerializer()
class DDBType: class DDBType:
""" """
@ -303,8 +307,14 @@ class Item(BaseModel):
return {"Attributes": attributes} return {"Attributes": attributes}
def to_regular_json(self) -> Dict[str, Any]:
attributes = {}
for key, attribute in self.attrs.items():
attributes[key] = deserializer.deserialize(attribute.to_json())
return attributes
def describe_attrs( def describe_attrs(
self, attributes: Optional[Dict[str, Any]] self, attributes: Optional[Dict[str, Any]] = None
) -> Dict[str, Dict[str, Any]]: ) -> Dict[str, Dict[str, Any]]:
if attributes: if attributes:
included = {} included = {}

View File

@ -0,0 +1,15 @@
from typing import Any, Dict, List
def query(
statement: str, source_data: Dict[str, str], parameters: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
from py_partiql_parser import DynamoDBStatementParser
return DynamoDBStatementParser(source_data).parse(statement, parameters)
def get_query_metadata(statement: str) -> Any:
from py_partiql_parser import DynamoDBStatementParser
return DynamoDBStatementParser.get_query_metadata(query=statement)

View File

@ -1095,3 +1095,21 @@ class DynamoHandler(BaseResponse):
target_table_name, source_table_name target_table_name, source_table_name
) )
return dynamo_json_dump(restored_table.describe()) return dynamo_json_dump(restored_table.describe())
def execute_statement(self) -> str:
stmt = self.body.get("Statement", "")
parameters = self.body.get("Parameters", [])
items = self.dynamodb_backend.execute_statement(
statement=stmt, parameters=parameters
)
return dynamo_json_dump({"Items": items})
def execute_transaction(self) -> str:
stmts = self.body.get("TransactStatements", [])
items = self.dynamodb_backend.execute_transaction(stmts)
return dynamo_json_dump({"Responses": items})
def batch_execute_statement(self) -> str:
stmts = self.body.get("Statements", [])
items = self.dynamodb_backend.batch_execute_statement(stmts)
return dynamo_json_dump({"Responses": items})

View File

@ -4,9 +4,9 @@ from typing import List
def parse_query(text_input, query): def parse_query(text_input, query):
from py_partiql_parser import Parser from py_partiql_parser import S3SelectParser
return Parser(source_data={"s3object": text_input}).parse(query) return S3SelectParser(source_data={"s3object": text_input}).parse(query)
def _create_header(key: bytes, value: bytes): def _create_header(key: bytes, value: bytes):

View File

@ -52,7 +52,7 @@ all =
openapi-spec-validator>=0.2.8 openapi-spec-validator>=0.2.8
pyparsing>=3.0.7 pyparsing>=3.0.7
jsondiff>=1.1.2 jsondiff>=1.1.2
py-partiql-parser==0.1.0 py-partiql-parser==0.3.0
aws-xray-sdk!=0.96,>=0.93 aws-xray-sdk!=0.96,>=0.93
setuptools setuptools
server = server =
@ -66,7 +66,7 @@ server =
openapi-spec-validator>=0.2.8 openapi-spec-validator>=0.2.8
pyparsing>=3.0.7 pyparsing>=3.0.7
jsondiff>=1.1.2 jsondiff>=1.1.2
py-partiql-parser==0.1.0 py-partiql-parser==0.3.0
aws-xray-sdk!=0.96,>=0.93 aws-xray-sdk!=0.96,>=0.93
setuptools setuptools
flask!=2.2.0,!=2.2.1 flask!=2.2.0,!=2.2.1
@ -100,7 +100,7 @@ cloudformation =
openapi-spec-validator>=0.2.8 openapi-spec-validator>=0.2.8
pyparsing>=3.0.7 pyparsing>=3.0.7
jsondiff>=1.1.2 jsondiff>=1.1.2
py-partiql-parser==0.1.0 py-partiql-parser==0.3.0
aws-xray-sdk!=0.96,>=0.93 aws-xray-sdk!=0.96,>=0.93
setuptools setuptools
cloudfront = cloudfront =
@ -121,8 +121,12 @@ datasync =
dax = dax =
dms = dms =
ds = sshpubkeys>=3.1.0 ds = sshpubkeys>=3.1.0
dynamodb = docker>=3.0.0 dynamodb =
dynamodbstreams = docker>=3.0.0 docker>=3.0.0
py-partiql-parser==0.3.0
dynamodbstreams =
docker>=3.0.0
py-partiql-parser==0.3.0
ebs = sshpubkeys>=3.1.0 ebs = sshpubkeys>=3.1.0
ec2 = sshpubkeys>=3.1.0 ec2 = sshpubkeys>=3.1.0
ec2instanceconnect = ec2instanceconnect =
@ -179,7 +183,7 @@ route53 =
route53resolver = sshpubkeys>=3.1.0 route53resolver = sshpubkeys>=3.1.0
s3 = s3 =
PyYAML>=5.1 PyYAML>=5.1
py-partiql-parser==0.1.0 py-partiql-parser==0.3.0
s3control = s3control =
sagemaker = sagemaker =
sdb = sdb =

View File

@ -0,0 +1,179 @@
import boto3
from moto import mock_dynamodb
from unittest import TestCase
class TestSelectStatements:
mock = mock_dynamodb()
@classmethod
def setup_class(cls):
cls.mock.start()
cls.client = boto3.client("dynamodb", "us-east-1")
cls.client.create_table(
TableName="messages",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5},
)
cls.item1 = {"id": {"S": "msg1"}, "body": {"S": "some text"}}
cls.item2 = {"id": {"S": "msg2"}, "body": {"S": "n/a"}, "unique": {"S": "key"}}
cls.client.put_item(TableName="messages", Item=cls.item1)
cls.client.put_item(TableName="messages", Item=cls.item2)
@classmethod
def teardown_class(cls):
try:
cls.mock.stop()
except RuntimeError:
pass
def test_execute_statement_select_star(self):
items = TestSelectStatements.client.execute_statement(
Statement="select * from messages"
)["Items"]
assert TestSelectStatements.item1 in items
assert TestSelectStatements.item2 in items
def test_execute_statement_select_unique(self):
items = TestSelectStatements.client.execute_statement(
Statement="select unique from messages"
)["Items"]
assert {} in items
assert {"unique": {"S": "key"}} in items
def test_execute_statement_with_parameter(self):
stmt = "select * from messages where id = ?"
items = TestSelectStatements.client.execute_statement(
Statement=stmt, Parameters=[{"S": "msg1"}]
)["Items"]
assert len(items) == 1
assert TestSelectStatements.item1 in items
stmt = "select id from messages where id = ?"
items = TestSelectStatements.client.execute_statement(
Statement=stmt, Parameters=[{"S": "msg1"}]
)["Items"]
assert len(items) == 1
assert {"id": {"S": "msg1"}} in items
def test_execute_statement_with_no_results(self):
stmt = "select * from messages where id = ?"
items = TestSelectStatements.client.execute_statement(
Statement=stmt, Parameters=[{"S": "msg3"}]
)["Items"]
assert items == []
@mock_dynamodb
class TestExecuteTransaction(TestCase):
def setUp(self):
self.client = boto3.client("dynamodb", "us-east-1")
self.client.create_table(
TableName="messages",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5},
)
self.item1 = {"id": {"S": "msg1"}, "body": {"S": "some text"}}
self.item2 = {"id": {"S": "msg2"}, "body": {"S": "n/a"}, "unique": {"S": "key"}}
self.client.put_item(TableName="messages", Item=self.item1)
self.client.put_item(TableName="messages", Item=self.item2)
def test_execute_transaction(self):
items = self.client.execute_transaction(
TransactStatements=[
{"Statement": "select id from messages"},
{
"Statement": "select * from messages where id = ?",
"Parameters": [{"S": "msg2"}],
},
]
)["Responses"]
assert len(items) == 3
@mock_dynamodb
class TestBatchExecuteStatement(TestCase):
def setUp(self):
self.client = boto3.client("dynamodb", "us-east-1")
for name in ["table1", "table2"]:
self.client.create_table(
TableName=name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5},
)
self.item1 = {"id": {"S": "msg1"}, "body": {"S": "some text"}}
self.item2 = {"id": {"S": "msg2"}, "body": {"S": "n/a"}, "unique": {"S": "key"}}
self.client.put_item(TableName="table1", Item=self.item1)
self.client.put_item(TableName="table1", Item=self.item2)
self.client.put_item(TableName="table2", Item=self.item1)
def test_execute_transaction(self):
items = self.client.batch_execute_statement(
Statements=[
{
"Statement": "select id from table1 where id = ?",
"Parameters": [{"S": "msg1"}],
},
{
"Statement": "select * from table2 where id = ?",
"Parameters": [{"S": "msg1"}],
},
{
"Statement": "select * from table2 where id = ?",
"Parameters": [{"S": "msg2"}],
},
]
)["Responses"]
assert len(items) == 3
assert {"TableName": "table1", "Item": {"id": {"S": "msg1"}}} in items
assert {"TableName": "table2", "Item": self.item1} in items
assert {"TableName": "table2"} in items
def test_without_primary_key_in_where_clause(self):
items = self.client.batch_execute_statement(
Statements=[
# Unknown table
{"Statement": "select id from unknown-table"},
# No WHERE-clause
{"Statement": "select id from table1"},
# WHERE-clause does not contain HashKey
{
"Statement": "select * from table1 where body = ?",
"Parameters": [{"S": "msg1"}],
},
# Valid WHERE-clause
{
"Statement": "select * from table2 where id = ?",
"Parameters": [{"S": "msg1"}],
},
]
)["Responses"]
assert len(items) == 4
assert {
"Error": {
"Code": "ResourceNotFound",
"Message": "Requested resource not found",
}
} in items
assert {
"Error": {
"Code": "ValidationError",
"Message": "Select statements within BatchExecuteStatement must "
"specify the primary key in the where clause.",
},
"TableName": "table1",
} in items
assert {
"Error": {
"Code": "ValidationError",
"Message": "Select statements within BatchExecuteStatement must "
"specify the primary key in the where clause.",
},
"TableName": "table1",
} in items
assert {"TableName": "table2", "Item": self.item1} in items