From 39fff6449382e17ac949d14588642a3e117aede3 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 29 Nov 2021 14:36:24 -0100 Subject: [PATCH] Scaffold-script improvements (#4641) --- scripts/scaffold.py | 250 +++++++++++++++++------ scripts/template/test/test_server.py.j2 | 7 +- scripts/template/test/test_service.py.j2 | 7 +- 3 files changed, 198 insertions(+), 66 deletions(-) diff --git a/scripts/scaffold.py b/scripts/scaffold.py index 058babb66..d70f89add 100755 --- a/scripts/scaffold.py +++ b/scripts/scaffold.py @@ -10,7 +10,7 @@ This script will look at the botocore's definition file for the selected service and operation, then auto-generate the code and responses. Almost all services are supported, as long as the service's protocol is -`query`, `json` or `rest-json`. Even if aws adds new services, this script +`query`, `json`, `rest-xml` or `rest-json`. Even if aws adds new services, this script will work if the protocol is known. TODO: @@ -20,6 +20,7 @@ TODO: should happen, please create an issue for the problem. """ import os +import random import re import inspect import importlib @@ -39,6 +40,8 @@ from moto.core import BaseBackend from inflection import singularize from implementation_coverage import get_moto_implementation +PRIMITIVE_SHAPES = ["string", "timestamp", "integer", "boolean", "sensitiveStringType", "long"] + TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "./template") INPUT_IGNORED_IN_BACKEND = ["Marker", "PageSize"] @@ -51,44 +54,57 @@ def print_progress(title, body, color): click.echo(body) -def select_service_and_operation(): +def select_service(): """Prompt user to select service and operation.""" + service_name = None service_names = Session().get_available_services() service_completer = WordCompleter(service_names) - service_name = prompt("Select service: ", completer=service_completer) - if service_name not in service_names: - click.secho(f"{service_name} is not valid service", fg="red") - raise click.Abort() - moto_client = get_moto_implementation(service_name) + while service_name not in service_names: + service_name = prompt("Select service: ", completer=service_completer) + if service_name not in service_names: + click.secho(f"{service_name} is not valid service", fg="red") + return service_name + + +def print_service_status(service_name): + implemented, operation_names = get_operations(service_name) + click.echo("==Current Implementation Status==") + for operation_name in operation_names: + check = "X" if operation_name in implemented else " " + click.secho(f"[{check}] {operation_name}") + click.echo("=================================") + + +def select_operation(service_name): + implemented, operation_names = get_operations(service_name) + operation_completer = WordCompleter(operation_names) + + operation_available = False + while not operation_available: + operation_name = prompt("Select Operation: ", completer=operation_completer) + if operation_name not in operation_names: + click.secho(f"{operation_name} is not valid operation", fg="red") + elif operation_name in implemented: + click.secho(f"{operation_name} is already implemented", fg="red") + else: + operation_available = True + return operation_name + + +def get_operations(service_name): + try: + moto_client, _name = get_moto_implementation(service_name) + except ModuleNotFoundError: + moto_client = None real_client = boto3.client(service_name, region_name="us-east-1") implemented = [] - not_implemented = [] - operation_names = [ xform_name(op) for op in real_client.meta.service_model.operation_names ] for operation in operation_names: if moto_client and operation in dir(moto_client): implemented.append(operation) - else: - not_implemented.append(operation) - operation_completer = WordCompleter(operation_names) - - click.echo("==Current Implementation Status==") - for operation_name in operation_names: - check = "X" if operation_name in implemented else " " - click.secho(f"[{check}] {operation_name}") - click.echo("=================================") - operation_name = prompt("Select Operation: ", completer=operation_completer) - - if operation_name not in operation_names: - click.secho(f"{operation_name} is not valid operation", fg="red") - raise click.Abort() - - if operation_name in implemented: - click.secho(f"{operation_name} is already implemented", fg="red") - raise click.Abort() - return service_name, operation_name + return implemented, operation_names def get_escaped_service(service): @@ -252,17 +268,17 @@ def get_function_in_responses( output_names = [ to_snake_case(_) for _ in outputs.keys() if _ not in OUTPUT_IGNORED_IN_BACKEND ] - body = f"\ndef {operation}(self):\n" + + body = "" + if protocol in ["rest-xml"]: + body += f"\ndef {operation}(self, request, full_url, headers):\n" + body += " self.setup_class(request, full_url, headers)\n" + else: + body = f"\ndef {operation}(self):\n" + body += " params = self._get_params()\n" for input_name, input_type in inputs.items(): - type_name = input_type.type_name - if type_name == "integer": - arg_line_tmpl = ' {}=self._get_int_param("{}")\n' - elif type_name == "list": - arg_line_tmpl = ' {}=self._get_list_prefix("{}.member")\n' - else: - arg_line_tmpl = ' {}=self._get_param("{}")\n' - body += arg_line_tmpl.format(to_snake_case(input_name), input_name) + body += f" {to_snake_case(input_name)} = params.get(\"{input_name}\")\n" if output_names: body += f" {', '.join(output_names)} = self.{escaped_service}_backend.{operation}(\n" else: @@ -271,7 +287,7 @@ def get_function_in_responses( body += f" {input_name}={input_name},\n" body += " )\n" - if protocol == "query": + if protocol in ["query", "rest-xml"]: body += f" template = self.response_template({operation.upper()}_TEMPLATE)\n" names = ", ".join([f"{n}={n}" for n in output_names]) body += f" return template.render({names})\n" @@ -316,6 +332,24 @@ def get_function_in_models(service, operation): return body +def get_func_in_tests(service, operation): + """ + Autogenerates an example unit test + Throws an exception by default, to remind the user to implement this + """ + escaped_service = get_escaped_service(service) + random_region = random.choice(["us-east-2", "eu-west-1", "ap-southeast-1"]) + body = "\n\n" + body += f"@mock_{escaped_service}\n" + body += f"def test_{operation}():\n" + body += f" client = boto3.client(\"{service}\", region_name=\"{random_region}\")\n" + body += f" resp = client.{operation}()\n" + body += f"\n" + body += f" raise Exception(\"NotYetImplemented\")" + body += "\n" + return body + + def _get_subtree(name, shape, replace_list, name_prefix=None): if not name_prefix: name_prefix = [] @@ -416,6 +450,87 @@ def get_response_query_template(service, operation): # pylint: disable=too-many return body +def get_response_restxml_template(service, operation): + """refers to definition of API in botocore, and autogenerates template + Assume that protocol is rest-xml. Shares some familiarity with protocol=query + """ + client = boto3.client(service) + aws_operation_name = get_operation_name_in_keys( + to_upper_camel_case(operation), + list(client._service_model._service_description["operations"].keys()), + ) + op_model = client._service_model.operation_model(aws_operation_name) + result_wrapper = op_model._operation_model["output"]["shape"] + response_wrapper = result_wrapper.replace("Result", "Response") + metadata = op_model.metadata + + shapes = client._service_model._shape_resolver._shape_map + + # build xml tree + t_root = etree.Element(response_wrapper) + + # build metadata + t_metadata = etree.Element("ResponseMetadata") + t_request_id = etree.Element("RequestId") + t_request_id.text = "1549581b-12b7-11e3-895e-1334aEXAMPLE" + t_metadata.append(t_request_id) + t_root.append(t_metadata) + + def _find_member(tree, shape_name, name_prefix): + shape = shapes[shape_name] + if shape["type"] == "list": + t_for = etree.Element("REPLACE_FOR") + t_for.set("for", to_snake_case(shape_name)) + t_for.set("in", ".".join(name_prefix[:-1]) + "." + shape_name) + member_shape = shape["member"]["shape"] + member_name = shape["member"].get("locationName") + if member_shape in PRIMITIVE_SHAPES: + t_member = etree.Element(member_name) + t_member.text = f"{{{{ {to_snake_case(shape_name)}.{to_snake_case(member_name)} }}}}" + t_for.append(t_member) + else: + _find_member(t_for, member_shape, [to_snake_case(shape_name)]) + tree.append(t_for) + elif shape["type"] in PRIMITIVE_SHAPES: + tree.text = f"{{{{ {shape_name} }}}}" + else: + for child, details in shape["members"].items(): + child = details.get("locationName", child) + if details["shape"] in PRIMITIVE_SHAPES: + t_member = etree.Element(child) + t_member.text = "{{ " + ".".join(name_prefix) + f".{to_snake_case(child)} }}}}" + tree.append(t_member) + else: + t = etree.Element(child) + _find_member(t, details["shape"], name_prefix + [to_snake_case(child)]) + tree.append(t) + + # build result + t_result = etree.Element(result_wrapper) + for name, details in shapes[result_wrapper]["members"].items(): + shape = details["shape"] + name = details.get("locationName", name) + if shape in PRIMITIVE_SHAPES: + t_member = etree.Element(name) + t_member.text = f"{{{{ {to_snake_case(name)} }}}}" + t_result.append(t_member) + else: + _find_member(t_result, name, name_prefix=["root"]) + t_root.append(t_result) + xml_body = etree.tostring(t_root, pretty_print=True).decode("utf-8") + # + # Still need to add FOR-loops in this template + # + # becomes + # {% for x in y %} + def conv(m): + return m.group().replace("REPLACE_FOR", "").replace("=", "").replace('"', " ").replace("<", "{%").replace(">", "%}").strip() + xml_body = re.sub(r"", conv, xml_body) + xml_body = xml_body.replace("", "{% endfor %}") + body = f'\n{operation.upper()}_TEMPLATE = """{xml_body}"""' + return body + + def insert_code_to_class(path, base_class, new_code): """Add code for class handling service's response or backend.""" with open(path, encoding="utf-8") as fhandle: @@ -477,6 +592,8 @@ def insert_url(service, operation, api_protocol): # pylint: disable=too-many-lo # generate url pattern if api_protocol == "rest-json": new_line = ' "{0}/.*$": response.dispatch,' + elif api_protocol == "rest-xml": + new_line = f' "{{0}}{uri}$": {service_class}Response.{operation},' else: new_line = f' "{{0}}{uri}$": {service_class}Response.dispatch,' if new_line in lines: @@ -493,14 +610,18 @@ def insert_codes(service, operation, api_protocol): escaped_service = get_escaped_service(service) func_in_responses = get_function_in_responses(service, operation, api_protocol) func_in_models = get_function_in_models(service, operation) + func_in_tests = get_func_in_tests(service, operation) # edit responses.py responses_path = f"moto/{escaped_service}/responses.py" print_progress("inserting code", responses_path, "green") insert_code_to_class(responses_path, BaseResponse, func_in_responses) # insert template - if api_protocol == "query": - template = get_response_query_template(service, operation) + if api_protocol in ["query", "rest-xml"]: + if api_protocol == "query": + template = get_response_query_template(service, operation) + elif api_protocol == "rest-xml": + template = get_response_restxml_template(service, operation) with open(responses_path, encoding="utf-8") as fhandle: lines = [_[:-1] for _ in fhandle.readlines()] lines += template.splitlines() @@ -515,6 +636,12 @@ def insert_codes(service, operation, api_protocol): # edit urls.py insert_url(service, operation, api_protocol) + # Edit tests + tests_path= f"tests/test_{escaped_service}/test_{escaped_service}.py" + print_progress("inserting code", tests_path, "green") + with open(tests_path, "a", encoding="utf-8") as fhandle: + fhandle.write(func_in_tests) + @click.command() def main(): @@ -526,28 +653,35 @@ def main(): " - Press enter to continue\n") """Create basic files needed for the user's choice of service and op.""" - service, operation = select_service_and_operation() + service = select_service() + print_service_status(service) - # pylint: disable=protected-access - api_protocol = boto3.client(service)._service_model.metadata["protocol"] - initialize_service(service, api_protocol) + while True: + operation = select_operation(service) - if api_protocol in ["query", "json", "rest-json"]: - insert_codes(service, operation, api_protocol) - else: - print_progress( - "skip inserting code", - f'api protocol "{api_protocol}" is not supported', - "yellow", + # pylint: disable=protected-access + api_protocol = boto3.client(service)._service_model.metadata["protocol"] + initialize_service(service, api_protocol) + + if api_protocol in ["query", "json", "rest-json", "rest-xml"]: + insert_codes(service, operation, api_protocol) + else: + print_progress( + "skip inserting code", + f'api protocol "{api_protocol}" is not supported', + "yellow", + ) + + click.echo( + "\n" + "Please select another operation, or Ctrl-X/Ctrl-C to cancel." + "\n\n" + "Remaining steps after development is complete:\n" + '- Run scripts/implementation_coverage.py,\n' + "- Run scripts/update_backend_index.py." + "\n" ) - click.echo( - "\n" - "Remaining steps after development is complete:\n" - '- Run scripts/implementation_coverage.py,\n' - "- Run scripts/update_backend_index.py." - ) - if __name__ == "__main__": main() diff --git a/scripts/template/test/test_server.py.j2 b/scripts/template/test/test_server.py.j2 index 805f95aeb..2b977bedd 100644 --- a/scripts/template/test/test_server.py.j2 +++ b/scripts/template/test/test_server.py.j2 @@ -2,11 +2,12 @@ import sure # noqa # pylint: disable=unused-import import moto.server as server -from moto import mock_{{ escaped_service }} -@mock_{{ escaped_service }} def test_{{ escaped_service }}_list(): backend = server.create_backend_app("{{ service }}") test_client = backend.test_client() - # do test + + resp = test_client.get("/") + resp.status_code.should.equal(200) + str(resp.data).should.contain("?") diff --git a/scripts/template/test/test_service.py.j2 b/scripts/template/test/test_service.py.j2 index 6308e679d..da7957478 100644 --- a/scripts/template/test/test_service.py.j2 +++ b/scripts/template/test/test_service.py.j2 @@ -4,9 +4,6 @@ import boto3 import sure # noqa # pylint: disable=unused-import from moto import mock_{{ escaped_service }} +# See our Development Tips on writing tests for hints on how to write good tests: +# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html -@mock_{{ escaped_service }} -def test_list(): - """Test input/output of the list API.""" - # do test - pass