Scaffold-script improvements (#4641)

This commit is contained in:
Bert Blommers 2021-11-29 14:36:24 -01:00 committed by GitHub
parent 2cf37a4b90
commit 39fff64493
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 198 additions and 66 deletions

View File

@ -10,7 +10,7 @@ This script will look at the botocore's definition file for the selected
service and operation, then auto-generate the code and responses. service and operation, then auto-generate the code and responses.
Almost all services are supported, as long as the service's protocol is Almost all services are supported, as long as the service's protocol is
`query`, `json` or `rest-json`. Even if aws adds new services, this script `query`, `json`, `rest-xml` or `rest-json`. Even if aws adds new services, this script
will work if the protocol is known. will work if the protocol is known.
TODO: TODO:
@ -20,6 +20,7 @@ TODO:
should happen, please create an issue for the problem. should happen, please create an issue for the problem.
""" """
import os import os
import random
import re import re
import inspect import inspect
import importlib import importlib
@ -39,6 +40,8 @@ from moto.core import BaseBackend
from inflection import singularize from inflection import singularize
from implementation_coverage import get_moto_implementation from implementation_coverage import get_moto_implementation
PRIMITIVE_SHAPES = ["string", "timestamp", "integer", "boolean", "sensitiveStringType", "long"]
TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "./template") TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "./template")
INPUT_IGNORED_IN_BACKEND = ["Marker", "PageSize"] INPUT_IGNORED_IN_BACKEND = ["Marker", "PageSize"]
@ -51,44 +54,57 @@ def print_progress(title, body, color):
click.echo(body) click.echo(body)
def select_service_and_operation(): def select_service():
"""Prompt user to select service and operation.""" """Prompt user to select service and operation."""
service_name = None
service_names = Session().get_available_services() service_names = Session().get_available_services()
service_completer = WordCompleter(service_names) service_completer = WordCompleter(service_names)
service_name = prompt("Select service: ", completer=service_completer) while service_name not in service_names:
if service_name not in service_names: service_name = prompt("Select service: ", completer=service_completer)
click.secho(f"{service_name} is not valid service", fg="red") if service_name not in service_names:
raise click.Abort() click.secho(f"{service_name} is not valid service", fg="red")
moto_client = get_moto_implementation(service_name) return service_name
def print_service_status(service_name):
implemented, operation_names = get_operations(service_name)
click.echo("==Current Implementation Status==")
for operation_name in operation_names:
check = "X" if operation_name in implemented else " "
click.secho(f"[{check}] {operation_name}")
click.echo("=================================")
def select_operation(service_name):
implemented, operation_names = get_operations(service_name)
operation_completer = WordCompleter(operation_names)
operation_available = False
while not operation_available:
operation_name = prompt("Select Operation: ", completer=operation_completer)
if operation_name not in operation_names:
click.secho(f"{operation_name} is not valid operation", fg="red")
elif operation_name in implemented:
click.secho(f"{operation_name} is already implemented", fg="red")
else:
operation_available = True
return operation_name
def get_operations(service_name):
try:
moto_client, _name = get_moto_implementation(service_name)
except ModuleNotFoundError:
moto_client = None
real_client = boto3.client(service_name, region_name="us-east-1") real_client = boto3.client(service_name, region_name="us-east-1")
implemented = [] implemented = []
not_implemented = []
operation_names = [ operation_names = [
xform_name(op) for op in real_client.meta.service_model.operation_names xform_name(op) for op in real_client.meta.service_model.operation_names
] ]
for operation in operation_names: for operation in operation_names:
if moto_client and operation in dir(moto_client): if moto_client and operation in dir(moto_client):
implemented.append(operation) implemented.append(operation)
else: return implemented, operation_names
not_implemented.append(operation)
operation_completer = WordCompleter(operation_names)
click.echo("==Current Implementation Status==")
for operation_name in operation_names:
check = "X" if operation_name in implemented else " "
click.secho(f"[{check}] {operation_name}")
click.echo("=================================")
operation_name = prompt("Select Operation: ", completer=operation_completer)
if operation_name not in operation_names:
click.secho(f"{operation_name} is not valid operation", fg="red")
raise click.Abort()
if operation_name in implemented:
click.secho(f"{operation_name} is already implemented", fg="red")
raise click.Abort()
return service_name, operation_name
def get_escaped_service(service): def get_escaped_service(service):
@ -252,17 +268,17 @@ def get_function_in_responses(
output_names = [ output_names = [
to_snake_case(_) for _ in outputs.keys() if _ not in OUTPUT_IGNORED_IN_BACKEND to_snake_case(_) for _ in outputs.keys() if _ not in OUTPUT_IGNORED_IN_BACKEND
] ]
body = f"\ndef {operation}(self):\n"
body = ""
if protocol in ["rest-xml"]:
body += f"\ndef {operation}(self, request, full_url, headers):\n"
body += " self.setup_class(request, full_url, headers)\n"
else:
body = f"\ndef {operation}(self):\n"
body += " params = self._get_params()\n"
for input_name, input_type in inputs.items(): for input_name, input_type in inputs.items():
type_name = input_type.type_name body += f" {to_snake_case(input_name)} = params.get(\"{input_name}\")\n"
if type_name == "integer":
arg_line_tmpl = ' {}=self._get_int_param("{}")\n'
elif type_name == "list":
arg_line_tmpl = ' {}=self._get_list_prefix("{}.member")\n'
else:
arg_line_tmpl = ' {}=self._get_param("{}")\n'
body += arg_line_tmpl.format(to_snake_case(input_name), input_name)
if output_names: if output_names:
body += f" {', '.join(output_names)} = self.{escaped_service}_backend.{operation}(\n" body += f" {', '.join(output_names)} = self.{escaped_service}_backend.{operation}(\n"
else: else:
@ -271,7 +287,7 @@ def get_function_in_responses(
body += f" {input_name}={input_name},\n" body += f" {input_name}={input_name},\n"
body += " )\n" body += " )\n"
if protocol == "query": if protocol in ["query", "rest-xml"]:
body += f" template = self.response_template({operation.upper()}_TEMPLATE)\n" body += f" template = self.response_template({operation.upper()}_TEMPLATE)\n"
names = ", ".join([f"{n}={n}" for n in output_names]) names = ", ".join([f"{n}={n}" for n in output_names])
body += f" return template.render({names})\n" body += f" return template.render({names})\n"
@ -316,6 +332,24 @@ def get_function_in_models(service, operation):
return body return body
def get_func_in_tests(service, operation):
"""
Autogenerates an example unit test
Throws an exception by default, to remind the user to implement this
"""
escaped_service = get_escaped_service(service)
random_region = random.choice(["us-east-2", "eu-west-1", "ap-southeast-1"])
body = "\n\n"
body += f"@mock_{escaped_service}\n"
body += f"def test_{operation}():\n"
body += f" client = boto3.client(\"{service}\", region_name=\"{random_region}\")\n"
body += f" resp = client.{operation}()\n"
body += f"\n"
body += f" raise Exception(\"NotYetImplemented\")"
body += "\n"
return body
def _get_subtree(name, shape, replace_list, name_prefix=None): def _get_subtree(name, shape, replace_list, name_prefix=None):
if not name_prefix: if not name_prefix:
name_prefix = [] name_prefix = []
@ -416,6 +450,87 @@ def get_response_query_template(service, operation): # pylint: disable=too-many
return body return body
def get_response_restxml_template(service, operation):
"""refers to definition of API in botocore, and autogenerates template
Assume that protocol is rest-xml. Shares some familiarity with protocol=query
"""
client = boto3.client(service)
aws_operation_name = get_operation_name_in_keys(
to_upper_camel_case(operation),
list(client._service_model._service_description["operations"].keys()),
)
op_model = client._service_model.operation_model(aws_operation_name)
result_wrapper = op_model._operation_model["output"]["shape"]
response_wrapper = result_wrapper.replace("Result", "Response")
metadata = op_model.metadata
shapes = client._service_model._shape_resolver._shape_map
# build xml tree
t_root = etree.Element(response_wrapper)
# build metadata
t_metadata = etree.Element("ResponseMetadata")
t_request_id = etree.Element("RequestId")
t_request_id.text = "1549581b-12b7-11e3-895e-1334aEXAMPLE"
t_metadata.append(t_request_id)
t_root.append(t_metadata)
def _find_member(tree, shape_name, name_prefix):
shape = shapes[shape_name]
if shape["type"] == "list":
t_for = etree.Element("REPLACE_FOR")
t_for.set("for", to_snake_case(shape_name))
t_for.set("in", ".".join(name_prefix[:-1]) + "." + shape_name)
member_shape = shape["member"]["shape"]
member_name = shape["member"].get("locationName")
if member_shape in PRIMITIVE_SHAPES:
t_member = etree.Element(member_name)
t_member.text = f"{{{{ {to_snake_case(shape_name)}.{to_snake_case(member_name)} }}}}"
t_for.append(t_member)
else:
_find_member(t_for, member_shape, [to_snake_case(shape_name)])
tree.append(t_for)
elif shape["type"] in PRIMITIVE_SHAPES:
tree.text = f"{{{{ {shape_name} }}}}"
else:
for child, details in shape["members"].items():
child = details.get("locationName", child)
if details["shape"] in PRIMITIVE_SHAPES:
t_member = etree.Element(child)
t_member.text = "{{ " + ".".join(name_prefix) + f".{to_snake_case(child)} }}}}"
tree.append(t_member)
else:
t = etree.Element(child)
_find_member(t, details["shape"], name_prefix + [to_snake_case(child)])
tree.append(t)
# build result
t_result = etree.Element(result_wrapper)
for name, details in shapes[result_wrapper]["members"].items():
shape = details["shape"]
name = details.get("locationName", name)
if shape in PRIMITIVE_SHAPES:
t_member = etree.Element(name)
t_member.text = f"{{{{ {to_snake_case(name)} }}}}"
t_result.append(t_member)
else:
_find_member(t_result, name, name_prefix=["root"])
t_root.append(t_result)
xml_body = etree.tostring(t_root, pretty_print=True).decode("utf-8")
#
# Still need to add FOR-loops in this template
# <REPLACE_FOR for="x" in="y">
# becomes
# {% for x in y %}
def conv(m):
return m.group().replace("REPLACE_FOR", "").replace("=", "").replace('"', " ").replace("<", "{%").replace(">", "%}").strip()
xml_body = re.sub(r"<REPLACE_FOR[\sa-zA-Z\"=_.]+>", conv, xml_body)
xml_body = xml_body.replace("</REPLACE_FOR>", "{% endfor %}")
body = f'\n{operation.upper()}_TEMPLATE = """{xml_body}"""'
return body
def insert_code_to_class(path, base_class, new_code): def insert_code_to_class(path, base_class, new_code):
"""Add code for class handling service's response or backend.""" """Add code for class handling service's response or backend."""
with open(path, encoding="utf-8") as fhandle: with open(path, encoding="utf-8") as fhandle:
@ -477,6 +592,8 @@ def insert_url(service, operation, api_protocol): # pylint: disable=too-many-lo
# generate url pattern # generate url pattern
if api_protocol == "rest-json": if api_protocol == "rest-json":
new_line = ' "{0}/.*$": response.dispatch,' new_line = ' "{0}/.*$": response.dispatch,'
elif api_protocol == "rest-xml":
new_line = f' "{{0}}{uri}$": {service_class}Response.{operation},'
else: else:
new_line = f' "{{0}}{uri}$": {service_class}Response.dispatch,' new_line = f' "{{0}}{uri}$": {service_class}Response.dispatch,'
if new_line in lines: if new_line in lines:
@ -493,14 +610,18 @@ def insert_codes(service, operation, api_protocol):
escaped_service = get_escaped_service(service) escaped_service = get_escaped_service(service)
func_in_responses = get_function_in_responses(service, operation, api_protocol) func_in_responses = get_function_in_responses(service, operation, api_protocol)
func_in_models = get_function_in_models(service, operation) func_in_models = get_function_in_models(service, operation)
func_in_tests = get_func_in_tests(service, operation)
# edit responses.py # edit responses.py
responses_path = f"moto/{escaped_service}/responses.py" responses_path = f"moto/{escaped_service}/responses.py"
print_progress("inserting code", responses_path, "green") print_progress("inserting code", responses_path, "green")
insert_code_to_class(responses_path, BaseResponse, func_in_responses) insert_code_to_class(responses_path, BaseResponse, func_in_responses)
# insert template # insert template
if api_protocol == "query": if api_protocol in ["query", "rest-xml"]:
template = get_response_query_template(service, operation) if api_protocol == "query":
template = get_response_query_template(service, operation)
elif api_protocol == "rest-xml":
template = get_response_restxml_template(service, operation)
with open(responses_path, encoding="utf-8") as fhandle: with open(responses_path, encoding="utf-8") as fhandle:
lines = [_[:-1] for _ in fhandle.readlines()] lines = [_[:-1] for _ in fhandle.readlines()]
lines += template.splitlines() lines += template.splitlines()
@ -515,6 +636,12 @@ def insert_codes(service, operation, api_protocol):
# edit urls.py # edit urls.py
insert_url(service, operation, api_protocol) insert_url(service, operation, api_protocol)
# Edit tests
tests_path= f"tests/test_{escaped_service}/test_{escaped_service}.py"
print_progress("inserting code", tests_path, "green")
with open(tests_path, "a", encoding="utf-8") as fhandle:
fhandle.write(func_in_tests)
@click.command() @click.command()
def main(): def main():
@ -526,28 +653,35 @@ def main():
" - Press enter to continue\n") " - Press enter to continue\n")
"""Create basic files needed for the user's choice of service and op.""" """Create basic files needed for the user's choice of service and op."""
service, operation = select_service_and_operation() service = select_service()
print_service_status(service)
# pylint: disable=protected-access while True:
api_protocol = boto3.client(service)._service_model.metadata["protocol"] operation = select_operation(service)
initialize_service(service, api_protocol)
if api_protocol in ["query", "json", "rest-json"]: # pylint: disable=protected-access
insert_codes(service, operation, api_protocol) api_protocol = boto3.client(service)._service_model.metadata["protocol"]
else: initialize_service(service, api_protocol)
print_progress(
"skip inserting code", if api_protocol in ["query", "json", "rest-json", "rest-xml"]:
f'api protocol "{api_protocol}" is not supported', insert_codes(service, operation, api_protocol)
"yellow", else:
print_progress(
"skip inserting code",
f'api protocol "{api_protocol}" is not supported',
"yellow",
)
click.echo(
"\n"
"Please select another operation, or Ctrl-X/Ctrl-C to cancel."
"\n\n"
"Remaining steps after development is complete:\n"
'- Run scripts/implementation_coverage.py,\n'
"- Run scripts/update_backend_index.py."
"\n"
) )
click.echo(
"\n"
"Remaining steps after development is complete:\n"
'- Run scripts/implementation_coverage.py,\n'
"- Run scripts/update_backend_index.py."
)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -2,11 +2,12 @@
import sure # noqa # pylint: disable=unused-import import sure # noqa # pylint: disable=unused-import
import moto.server as server import moto.server as server
from moto import mock_{{ escaped_service }}
@mock_{{ escaped_service }}
def test_{{ escaped_service }}_list(): def test_{{ escaped_service }}_list():
backend = server.create_backend_app("{{ service }}") backend = server.create_backend_app("{{ service }}")
test_client = backend.test_client() test_client = backend.test_client()
# do test
resp = test_client.get("/")
resp.status_code.should.equal(200)
str(resp.data).should.contain("?")

View File

@ -4,9 +4,6 @@ import boto3
import sure # noqa # pylint: disable=unused-import import sure # noqa # pylint: disable=unused-import
from moto import mock_{{ escaped_service }} from moto import mock_{{ escaped_service }}
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_{{ escaped_service }}
def test_list():
"""Test input/output of the list API."""
# do test
pass