From 822d94f59ed4fbd4cdad8f59285f974a9fc489fa Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 16 Apr 2023 19:06:29 +0000 Subject: [PATCH] DynamoDB: Initial support for execute_statement et. al. (#6216) --- IMPLEMENTATION_COVERAGE.md | 8 +- docs/docs/services/dynamodb.rst | 22 ++- moto/dynamodb/models/__init__.py | 87 +++++++++ moto/dynamodb/models/dynamo_type.py | 12 +- moto/dynamodb/parsing/partiql.py | 15 ++ moto/dynamodb/responses.py | 18 ++ moto/s3/select_object_content.py | 4 +- setup.cfg | 16 +- .../test_dynamodb/test_dynamodb_statements.py | 179 ++++++++++++++++++ 9 files changed, 345 insertions(+), 16 deletions(-) create mode 100644 moto/dynamodb/parsing/partiql.py create mode 100644 tests/test_dynamodb/test_dynamodb_statements.py diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 641ba7cdb..6c53e5894 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -1678,9 +1678,9 @@ ## dynamodb
-54% implemented +60% implemented -- [ ] batch_execute_statement +- [X] batch_execute_statement - [X] batch_get_item - [X] batch_write_item - [X] create_backup @@ -1704,8 +1704,8 @@ - [X] describe_time_to_live - [ ] disable_kinesis_streaming_destination - [ ] enable_kinesis_streaming_destination -- [ ] execute_statement -- [ ] execute_transaction +- [X] execute_statement +- [X] execute_transaction - [ ] export_table_to_point_in_time - [X] get_item - [ ] import_table diff --git a/docs/docs/services/dynamodb.rst b/docs/docs/services/dynamodb.rst index be78bb51e..a79fd1b8b 100644 --- a/docs/docs/services/dynamodb.rst +++ b/docs/docs/services/dynamodb.rst @@ -25,7 +25,11 @@ dynamodb |start-h3| Implemented features for this service |end-h3| -- [ ] batch_execute_statement +- [X] batch_execute_statement + + Please see the documentation for `execute_statement` to see the limitations of what is supported. + + - [X] batch_get_item - [X] batch_write_item - [X] create_backup @@ -49,8 +53,20 @@ dynamodb - [X] describe_time_to_live - [ ] disable_kinesis_streaming_destination - [ ] enable_kinesis_streaming_destination -- [ ] execute_statement -- [ ] execute_transaction +- [X] execute_statement + + Only SELECT-statements are supported for now. + + Pagination is not yet implemented. + + Parsing of the statement is highly experimental - please raise an issue if you find any bugs. + + +- [X] execute_transaction + + Please see the documentation for `execute_statement` to see the limitations of what is supported. + + - [ ] export_table_to_point_in_time - [X] get_item - [ ] import_table diff --git a/moto/dynamodb/models/__init__.py b/moto/dynamodb/models/__init__.py index d0ac4f068..d2830ead2 100644 --- a/moto/dynamodb/models/__init__.py +++ b/moto/dynamodb/models/__init__.py @@ -1,4 +1,5 @@ import copy +import json import re from collections import OrderedDict @@ -25,6 +26,7 @@ from moto.dynamodb.exceptions import ( TransactWriteSingleOpException, ) from moto.dynamodb.models.dynamo_type import DynamoType, Item +from moto.dynamodb.models.dynamo_type import serializer, deserializer from moto.dynamodb.models.table import ( Table, RestoredTable, @@ -35,6 +37,7 @@ from moto.dynamodb.models.table import ( from moto.dynamodb.parsing.executors import UpdateExpressionExecutor from moto.dynamodb.parsing.expressions import UpdateExpressionParser # type: ignore from moto.dynamodb.parsing.validators import UpdateExpressionValidator +from moto.dynamodb.parsing import partiql class DynamoDBBackend(BaseBackend): @@ -770,5 +773,89 @@ class DynamoDBBackend(BaseBackend): def transact_get_items(self) -> None: pass + def execute_statement( + self, statement: str, parameters: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Only SELECT-statements are supported for now. + + Pagination is not yet implemented. + + Parsing is highly experimental - please raise an issue if you find any bugs. + """ + # We need to execute a statement - but we don't know which table + # Just pass all tables to PartiQL + source_data: Dict[str, str] = dict() + for table in self.tables.values(): + source_data[table.name] = "\n".join( + [json.dumps(item.to_regular_json()) for item in table.all_items()] + ) + + # Parameters are in DynamoDB JSON form ({"S": "value"}) - we only want the value itself + parameters = [deserializer.deserialize(param) for param in parameters] + + regular_json = partiql.query(statement, source_data, parameters) + return [ + {key: serializer.serialize(value) for key, value in item.items()} + for item in regular_json + ] + + def execute_transaction( + self, statements: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Please see the documentation for `execute_statement` to see the limitations of what is supported. + """ + responses = [] + for stmt in statements: + items = self.execute_statement( + statement=stmt["Statement"], parameters=stmt.get("Parameters", []) + ) + responses.extend([{"Item": item} for item in items]) + return responses + + def batch_execute_statement( + self, statements: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Please see the documentation for `execute_statement` to see the limitations of what is supported. + """ + responses = [] + # Validation + for stmt in statements: + metadata = partiql.get_query_metadata(stmt["Statement"]) + table_name = metadata.get_table_names()[0] + response = {} + filter_keys = metadata.get_filter_names() + if table_name not in self.tables: + response["Error"] = { + "Code": "ResourceNotFound", + "Message": "Requested resource not found", + } + else: + response["TableName"] = table_name + table = self.tables[table_name] + for required_attr in table.table_key_attrs: + if required_attr not in filter_keys: + response["Error"] = { + "Code": "ValidationError", + "Message": "Select statements within BatchExecuteStatement must specify the primary key in the where clause.", + } + responses.append(response) + + # Execution + for idx, stmt in enumerate(statements): + if "Error" in responses[idx]: + continue + items = self.execute_statement( + statement=stmt["Statement"], parameters=stmt.get("Parameters", []) + ) + # Statements should always contain a HashKey and SortKey + # An item with those keys may not exist + if items: + # But if it does, it will always only contain one item at most + responses[idx]["Item"] = items[0] + return responses + dynamodb_backends = BackendDict(DynamoDBBackend, "dynamodb") diff --git a/moto/dynamodb/models/dynamo_type.py b/moto/dynamodb/models/dynamo_type.py index d219b909a..38ba27e99 100644 --- a/moto/dynamodb/models/dynamo_type.py +++ b/moto/dynamodb/models/dynamo_type.py @@ -1,4 +1,5 @@ import decimal +from boto3.dynamodb.types import TypeDeserializer, TypeSerializer from typing import Any, Dict, List, Union, Optional from moto.core import BaseModel @@ -9,6 +10,9 @@ from moto.dynamodb.exceptions import ( ) from moto.dynamodb.models.utilities import bytesize +deserializer = TypeDeserializer() +serializer = TypeSerializer() + class DDBType: """ @@ -303,8 +307,14 @@ class Item(BaseModel): return {"Attributes": attributes} + def to_regular_json(self) -> Dict[str, Any]: + attributes = {} + for key, attribute in self.attrs.items(): + attributes[key] = deserializer.deserialize(attribute.to_json()) + return attributes + def describe_attrs( - self, attributes: Optional[Dict[str, Any]] + self, attributes: Optional[Dict[str, Any]] = None ) -> Dict[str, Dict[str, Any]]: if attributes: included = {} diff --git a/moto/dynamodb/parsing/partiql.py b/moto/dynamodb/parsing/partiql.py new file mode 100644 index 000000000..78a92c345 --- /dev/null +++ b/moto/dynamodb/parsing/partiql.py @@ -0,0 +1,15 @@ +from typing import Any, Dict, List + + +def query( + statement: str, source_data: Dict[str, str], parameters: List[Dict[str, Any]] +) -> List[Dict[str, Any]]: + from py_partiql_parser import DynamoDBStatementParser + + return DynamoDBStatementParser(source_data).parse(statement, parameters) + + +def get_query_metadata(statement: str) -> Any: + from py_partiql_parser import DynamoDBStatementParser + + return DynamoDBStatementParser.get_query_metadata(query=statement) diff --git a/moto/dynamodb/responses.py b/moto/dynamodb/responses.py index 2108f4b4f..bd805c0ea 100644 --- a/moto/dynamodb/responses.py +++ b/moto/dynamodb/responses.py @@ -1095,3 +1095,21 @@ class DynamoHandler(BaseResponse): target_table_name, source_table_name ) return dynamo_json_dump(restored_table.describe()) + + def execute_statement(self) -> str: + stmt = self.body.get("Statement", "") + parameters = self.body.get("Parameters", []) + items = self.dynamodb_backend.execute_statement( + statement=stmt, parameters=parameters + ) + return dynamo_json_dump({"Items": items}) + + def execute_transaction(self) -> str: + stmts = self.body.get("TransactStatements", []) + items = self.dynamodb_backend.execute_transaction(stmts) + return dynamo_json_dump({"Responses": items}) + + def batch_execute_statement(self) -> str: + stmts = self.body.get("Statements", []) + items = self.dynamodb_backend.batch_execute_statement(stmts) + return dynamo_json_dump({"Responses": items}) diff --git a/moto/s3/select_object_content.py b/moto/s3/select_object_content.py index a712fc5af..58a02a8b2 100644 --- a/moto/s3/select_object_content.py +++ b/moto/s3/select_object_content.py @@ -4,9 +4,9 @@ from typing import List def parse_query(text_input, query): - from py_partiql_parser import Parser + from py_partiql_parser import S3SelectParser - return Parser(source_data={"s3object": text_input}).parse(query) + return S3SelectParser(source_data={"s3object": text_input}).parse(query) def _create_header(key: bytes, value: bytes): diff --git a/setup.cfg b/setup.cfg index 0cb467bd5..479e424fd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,7 +52,7 @@ all = openapi-spec-validator>=0.2.8 pyparsing>=3.0.7 jsondiff>=1.1.2 - py-partiql-parser==0.1.0 + py-partiql-parser==0.3.0 aws-xray-sdk!=0.96,>=0.93 setuptools server = @@ -66,7 +66,7 @@ server = openapi-spec-validator>=0.2.8 pyparsing>=3.0.7 jsondiff>=1.1.2 - py-partiql-parser==0.1.0 + py-partiql-parser==0.3.0 aws-xray-sdk!=0.96,>=0.93 setuptools flask!=2.2.0,!=2.2.1 @@ -100,7 +100,7 @@ cloudformation = openapi-spec-validator>=0.2.8 pyparsing>=3.0.7 jsondiff>=1.1.2 - py-partiql-parser==0.1.0 + py-partiql-parser==0.3.0 aws-xray-sdk!=0.96,>=0.93 setuptools cloudfront = @@ -121,8 +121,12 @@ datasync = dax = dms = ds = sshpubkeys>=3.1.0 -dynamodb = docker>=3.0.0 -dynamodbstreams = docker>=3.0.0 +dynamodb = + docker>=3.0.0 + py-partiql-parser==0.3.0 +dynamodbstreams = + docker>=3.0.0 + py-partiql-parser==0.3.0 ebs = sshpubkeys>=3.1.0 ec2 = sshpubkeys>=3.1.0 ec2instanceconnect = @@ -179,7 +183,7 @@ route53 = route53resolver = sshpubkeys>=3.1.0 s3 = PyYAML>=5.1 - py-partiql-parser==0.1.0 + py-partiql-parser==0.3.0 s3control = sagemaker = sdb = diff --git a/tests/test_dynamodb/test_dynamodb_statements.py b/tests/test_dynamodb/test_dynamodb_statements.py new file mode 100644 index 000000000..871026ad2 --- /dev/null +++ b/tests/test_dynamodb/test_dynamodb_statements.py @@ -0,0 +1,179 @@ +import boto3 + +from moto import mock_dynamodb +from unittest import TestCase + + +class TestSelectStatements: + + mock = mock_dynamodb() + + @classmethod + def setup_class(cls): + cls.mock.start() + cls.client = boto3.client("dynamodb", "us-east-1") + cls.client.create_table( + TableName="messages", + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}], + ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5}, + ) + cls.item1 = {"id": {"S": "msg1"}, "body": {"S": "some text"}} + cls.item2 = {"id": {"S": "msg2"}, "body": {"S": "n/a"}, "unique": {"S": "key"}} + cls.client.put_item(TableName="messages", Item=cls.item1) + cls.client.put_item(TableName="messages", Item=cls.item2) + + @classmethod + def teardown_class(cls): + try: + cls.mock.stop() + except RuntimeError: + pass + + def test_execute_statement_select_star(self): + items = TestSelectStatements.client.execute_statement( + Statement="select * from messages" + )["Items"] + assert TestSelectStatements.item1 in items + assert TestSelectStatements.item2 in items + + def test_execute_statement_select_unique(self): + items = TestSelectStatements.client.execute_statement( + Statement="select unique from messages" + )["Items"] + assert {} in items + assert {"unique": {"S": "key"}} in items + + def test_execute_statement_with_parameter(self): + stmt = "select * from messages where id = ?" + items = TestSelectStatements.client.execute_statement( + Statement=stmt, Parameters=[{"S": "msg1"}] + )["Items"] + assert len(items) == 1 + assert TestSelectStatements.item1 in items + + stmt = "select id from messages where id = ?" + items = TestSelectStatements.client.execute_statement( + Statement=stmt, Parameters=[{"S": "msg1"}] + )["Items"] + assert len(items) == 1 + assert {"id": {"S": "msg1"}} in items + + def test_execute_statement_with_no_results(self): + stmt = "select * from messages where id = ?" + items = TestSelectStatements.client.execute_statement( + Statement=stmt, Parameters=[{"S": "msg3"}] + )["Items"] + assert items == [] + + +@mock_dynamodb +class TestExecuteTransaction(TestCase): + def setUp(self): + self.client = boto3.client("dynamodb", "us-east-1") + self.client.create_table( + TableName="messages", + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}], + ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5}, + ) + self.item1 = {"id": {"S": "msg1"}, "body": {"S": "some text"}} + self.item2 = {"id": {"S": "msg2"}, "body": {"S": "n/a"}, "unique": {"S": "key"}} + self.client.put_item(TableName="messages", Item=self.item1) + self.client.put_item(TableName="messages", Item=self.item2) + + def test_execute_transaction(self): + items = self.client.execute_transaction( + TransactStatements=[ + {"Statement": "select id from messages"}, + { + "Statement": "select * from messages where id = ?", + "Parameters": [{"S": "msg2"}], + }, + ] + )["Responses"] + assert len(items) == 3 + + +@mock_dynamodb +class TestBatchExecuteStatement(TestCase): + def setUp(self): + self.client = boto3.client("dynamodb", "us-east-1") + for name in ["table1", "table2"]: + self.client.create_table( + TableName=name, + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}], + ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5}, + ) + self.item1 = {"id": {"S": "msg1"}, "body": {"S": "some text"}} + self.item2 = {"id": {"S": "msg2"}, "body": {"S": "n/a"}, "unique": {"S": "key"}} + self.client.put_item(TableName="table1", Item=self.item1) + self.client.put_item(TableName="table1", Item=self.item2) + self.client.put_item(TableName="table2", Item=self.item1) + + def test_execute_transaction(self): + items = self.client.batch_execute_statement( + Statements=[ + { + "Statement": "select id from table1 where id = ?", + "Parameters": [{"S": "msg1"}], + }, + { + "Statement": "select * from table2 where id = ?", + "Parameters": [{"S": "msg1"}], + }, + { + "Statement": "select * from table2 where id = ?", + "Parameters": [{"S": "msg2"}], + }, + ] + )["Responses"] + assert len(items) == 3 + assert {"TableName": "table1", "Item": {"id": {"S": "msg1"}}} in items + assert {"TableName": "table2", "Item": self.item1} in items + assert {"TableName": "table2"} in items + + def test_without_primary_key_in_where_clause(self): + items = self.client.batch_execute_statement( + Statements=[ + # Unknown table + {"Statement": "select id from unknown-table"}, + # No WHERE-clause + {"Statement": "select id from table1"}, + # WHERE-clause does not contain HashKey + { + "Statement": "select * from table1 where body = ?", + "Parameters": [{"S": "msg1"}], + }, + # Valid WHERE-clause + { + "Statement": "select * from table2 where id = ?", + "Parameters": [{"S": "msg1"}], + }, + ] + )["Responses"] + assert len(items) == 4 + assert { + "Error": { + "Code": "ResourceNotFound", + "Message": "Requested resource not found", + } + } in items + assert { + "Error": { + "Code": "ValidationError", + "Message": "Select statements within BatchExecuteStatement must " + "specify the primary key in the where clause.", + }, + "TableName": "table1", + } in items + assert { + "Error": { + "Code": "ValidationError", + "Message": "Select statements within BatchExecuteStatement must " + "specify the primary key in the where clause.", + }, + "TableName": "table1", + } in items + assert {"TableName": "table2", "Item": self.item1} in items