moto/scripts/scaffold.py

542 lines
19 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""Generates template code and response body for specified boto3's operation.
2017-09-23 08:03:42 +00:00
You only have to select service and operation that you want to add.
This script looks at the botocore's definition file of specified service and
operation, and auto-generates codes and reponses.
Basically, this script supports almost all services, as long as its
protocol is `query`, `json` or `rest-json`. Even if aws adds new
services, this script will work as long as the protocol is known.
2017-09-23 08:03:42 +00:00
TODO:
- This scripts don't generates functions in `responses.py` for
`rest-json`, because I don't know the rule of it. want someone fix this.
- In some services's operations, this scripts might crash. Make new
issue on github then.
2017-09-23 08:03:42 +00:00
"""
import os
2017-09-19 19:36:11 +00:00
import re
import inspect
import importlib
2017-09-19 19:36:11 +00:00
from lxml import etree
import click
import jinja2
2020-10-06 06:46:05 +00:00
from prompt_toolkit import prompt
from prompt_toolkit.completion import WordCompleter
from botocore import xform_name
from botocore.session import Session
import boto3
from moto.core.responses import BaseResponse
from moto.core import BaseBackend
2017-09-19 19:36:11 +00:00
from inflection import singularize
from implementation_coverage import get_moto_implementation
2017-09-19 19:36:11 +00:00
2020-10-06 06:46:05 +00:00
TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "./template")
2020-10-06 06:46:05 +00:00
INPUT_IGNORED_IN_BACKEND = ["Marker", "PageSize"]
OUTPUT_IGNORED_IN_BACKEND = ["NextMarker"]
2017-09-19 19:36:11 +00:00
def print_progress(title, body, color):
click.secho("\t{}\t".format(title), fg=color, nl=False)
click.echo(body)
def select_service_and_operation():
service_names = Session().get_available_services()
service_completer = WordCompleter(service_names)
service_name = prompt("Select service: ", completer=service_completer)
if service_name not in service_names:
click.secho("{} is not valid service".format(service_name), fg="red")
raise click.Abort()
moto_client = get_moto_implementation(service_name)
2020-10-06 06:46:05 +00:00
real_client = boto3.client(service_name, region_name="us-east-1")
implemented = []
not_implemented = []
2020-10-06 06:46:05 +00:00
operation_names = [
xform_name(op) for op in real_client.meta.service_model.operation_names
]
for operation in operation_names:
if moto_client and operation in dir(moto_client):
implemented.append(operation)
else:
not_implemented.append(operation)
operation_completer = WordCompleter(operation_names)
2020-10-06 06:46:05 +00:00
click.echo("==Current Implementation Status==")
for operation_name in operation_names:
2020-10-06 06:46:05 +00:00
check = "X" if operation_name in implemented else " "
click.secho("[{}] {}".format(check, operation_name))
click.echo("=================================")
operation_name = prompt("Select Operation: ", completer=operation_completer)
if operation_name not in operation_names:
2020-10-06 06:46:05 +00:00
click.secho("{} is not valid operation".format(operation_name), fg="red")
raise click.Abort()
if operation_name in implemented:
2020-10-06 06:46:05 +00:00
click.secho("{} is already implemented".format(operation_name), fg="red")
raise click.Abort()
return service_name, operation_name
2020-10-06 06:46:05 +00:00
def get_escaped_service(service):
2020-10-06 06:46:05 +00:00
return service.replace("-", "")
def get_lib_dir(service):
2020-10-06 06:46:05 +00:00
return os.path.join("moto", get_escaped_service(service))
def get_test_dir(service):
2020-10-06 06:46:05 +00:00
return os.path.join("tests", "test_{}".format(get_escaped_service(service)))
2017-09-22 05:03:12 +00:00
def render_template(tmpl_dir, tmpl_filename, context, service, alt_filename=None):
is_test = "test" in tmpl_dir
2020-10-06 06:46:05 +00:00
rendered = (
jinja2.Environment(loader=jinja2.FileSystemLoader(tmpl_dir))
.get_template(tmpl_filename)
.render(context)
)
dirname = get_test_dir(service) if is_test else get_lib_dir(service)
filename = alt_filename or os.path.splitext(tmpl_filename)[0]
filepath = os.path.join(dirname, filename)
if os.path.exists(filepath):
2020-10-06 06:46:05 +00:00
print_progress("skip creating", filepath, "yellow")
else:
2020-10-06 06:46:05 +00:00
print_progress("creating", filepath, "green")
with open(filepath, "w") as fhandle:
fhandle.write(rendered)
def append_mock_to_init_py(service):
2020-10-06 06:46:05 +00:00
path = os.path.join(os.path.dirname(__file__), "..", "moto", "__init__.py")
with open(path) as fhandle:
lines = [_.replace("\n", "") for _ in fhandle.readlines()]
2020-10-06 06:46:05 +00:00
if any(_ for _ in lines if re.match("^mock_{}.*lazy_load(.*)$".format(service), _)):
return
2020-10-06 06:46:05 +00:00
filtered_lines = [_ for _ in lines if re.match("^mock_.*lazy_load(.*)$", _)]
last_import_line_index = lines.index(filtered_lines[-1])
2020-10-06 06:46:05 +00:00
new_line = 'mock_{} = lazy_load(".{}", "mock_{}")'.format(
get_escaped_service(service),
get_escaped_service(service),
get_escaped_service(service),
)
lines.insert(last_import_line_index + 1, new_line)
2020-10-06 06:46:05 +00:00
body = "\n".join(lines) + "\n"
with open(path, "w") as fhandle:
fhandle.write(body)
def append_mock_dict_to_backends_py(service):
2020-10-06 06:46:05 +00:00
path = os.path.join(os.path.dirname(__file__), "..", "moto", "backends.py")
with open(path) as fhandle:
lines = [_.replace("\n", "") for _ in fhandle.readlines()]
if any(_ for _ in lines if re.match(f'.*"{service}": {service}_backends.*', _)):
return
2020-10-06 06:46:05 +00:00
filtered_lines = [_ for _ in lines if re.match('.*".*":.*_backends.*', _)]
last_elem_line_index = lines.index(filtered_lines[-1])
2020-10-06 06:46:05 +00:00
new_line = ' "{}": ("{}", "{}_backends"),'.format(
service, get_escaped_service(service), get_escaped_service(service)
)
prev_line = lines[last_elem_line_index]
2020-10-06 06:46:05 +00:00
if not prev_line.endswith("{") and not prev_line.endswith(","):
lines[last_elem_line_index] += ","
lines.insert(last_elem_line_index + 1, new_line)
2020-10-06 06:46:05 +00:00
body = "\n".join(lines) + "\n"
with open(path, "w") as fhandle:
fhandle.write(body)
2020-10-06 06:46:05 +00:00
def initialize_service(service, api_protocol):
2020-10-06 06:46:05 +00:00
"""create lib and test dirs if not exist"""
lib_dir = get_lib_dir(service)
test_dir = get_test_dir(service)
2020-10-06 06:46:05 +00:00
print_progress("Initializing service", service, "green")
client = boto3.client(service)
service_class = client.__class__.__name__
endpoint_prefix = client._service_model.endpoint_prefix
tmpl_context = {
2020-10-06 06:46:05 +00:00
"service": service,
"service_class": service_class,
"endpoint_prefix": endpoint_prefix,
"api_protocol": api_protocol,
"escaped_service": get_escaped_service(service),
}
# initialize service directory
if os.path.exists(lib_dir):
2020-10-06 06:46:05 +00:00
print_progress("skip creating", lib_dir, "yellow")
else:
2020-10-06 06:46:05 +00:00
print_progress("creating", lib_dir, "green")
os.makedirs(lib_dir)
2020-10-06 06:46:05 +00:00
tmpl_dir = os.path.join(TEMPLATE_DIR, "lib")
for tmpl_filename in os.listdir(tmpl_dir):
2020-10-06 06:46:05 +00:00
render_template(tmpl_dir, tmpl_filename, tmpl_context, service)
# initialize test directory
if os.path.exists(test_dir):
2020-10-06 06:46:05 +00:00
print_progress("skip creating", test_dir, "yellow")
else:
2020-10-06 06:46:05 +00:00
print_progress("creating", test_dir, "green")
os.makedirs(test_dir)
2020-10-06 06:46:05 +00:00
tmpl_dir = os.path.join(TEMPLATE_DIR, "test")
for tmpl_filename in os.listdir(tmpl_dir):
2020-10-06 06:46:05 +00:00
alt_filename = (
"test_{}.py".format(get_escaped_service(service))
if tmpl_filename == "test_service.py.j2"
else None
)
2020-10-06 06:46:05 +00:00
render_template(tmpl_dir, tmpl_filename, tmpl_context, service, alt_filename)
# append mock to init files
append_mock_to_init_py(service)
append_mock_dict_to_backends_py(service)
def to_upper_camel_case(string):
return "".join([_.title() for _ in string.split("_")])
2017-09-19 19:36:11 +00:00
def to_lower_camel_case(string):
words = string.split("_")
2020-10-06 06:46:05 +00:00
return "".join(words[:1] + [_.title() for _ in words[1:]])
def to_snake_case(string):
new_string = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", string)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", new_string).lower()
2020-10-06 06:46:05 +00:00
2017-09-19 19:36:11 +00:00
def get_operation_name_in_keys(operation_name, operation_keys):
index = [_.lower() for _ in operation_keys].index(operation_name.lower())
return operation_keys[index]
2017-09-19 19:36:11 +00:00
2020-10-06 06:46:05 +00:00
def get_function_in_responses(service, operation, protocol):
2017-09-19 19:36:11 +00:00
"""refers to definition of API in botocore, and autogenerates function
You can see example of elbv2 from link below.
https://github.com/boto/botocore/blob/develop/botocore/data/elbv2/2015-12-01/service-2.json
"""
client = boto3.client(service)
aws_operation_name = get_operation_name_in_keys(
to_upper_camel_case(operation),
2020-10-06 06:46:05 +00:00
list(client._service_model._service_description["operations"].keys()),
)
2017-09-19 19:36:11 +00:00
op_model = client._service_model.operation_model(aws_operation_name)
2020-10-06 06:46:05 +00:00
if not hasattr(op_model.output_shape, "members"):
outputs = {}
else:
outputs = op_model.output_shape.members
2017-09-19 19:36:11 +00:00
inputs = op_model.input_shape.members
2020-10-06 06:46:05 +00:00
input_names = [
to_snake_case(_) for _ in inputs.keys() if _ not in INPUT_IGNORED_IN_BACKEND
]
output_names = [
to_snake_case(_) for _ in outputs.keys() if _ not in OUTPUT_IGNORED_IN_BACKEND
]
body = "\ndef {}(self):\n".format(operation)
2017-09-19 19:36:11 +00:00
for input_name, input_type in inputs.items():
type_name = input_type.type_name
2020-10-06 06:46:05 +00:00
if type_name == "integer":
arg_line_tmpl = ' {} = self._get_int_param("{}")\n'
2020-10-06 06:46:05 +00:00
elif type_name == "list":
2017-09-19 19:36:11 +00:00
arg_line_tmpl = ' {} = self._get_list_prefix("{}.member")\n'
else:
arg_line_tmpl = ' {} = self._get_param("{}")\n'
body += arg_line_tmpl.format(to_snake_case(input_name), input_name)
if output_names:
2020-10-06 06:46:05 +00:00
body += " {} = self.{}_backend.{}(\n".format(
", ".join(output_names), get_escaped_service(service), operation
)
2017-09-19 19:36:11 +00:00
else:
2020-10-06 06:46:05 +00:00
body += " self.{}_backend.{}(\n".format(
get_escaped_service(service), operation
)
2017-09-19 19:36:11 +00:00
for input_name in input_names:
body += f" {input_name}={input_name},\n"
2017-09-19 19:36:11 +00:00
2020-10-06 06:46:05 +00:00
body += " )\n"
if protocol == "query":
body += " template = self.response_template({}_TEMPLATE)\n".format(
operation.upper()
)
body += " return template.render({})\n".format(
", ".join([f"{n}={n}" for n in output_names])
2020-10-06 06:46:05 +00:00
)
elif protocol in ["json", "rest-json"]:
body += " # TODO: adjust response\n"
body += " return json.dumps(dict({}))\n".format(
", ".join(["{}={}".format(to_lower_camel_case(_), _) for _ in output_names])
)
2017-09-19 19:36:11 +00:00
return body
def get_function_in_models(service, operation):
"""refers to definition of API in botocore, and autogenerates function
You can see example of elbv2 from link below.
https://github.com/boto/botocore/blob/develop/botocore/data/elbv2/2015-12-01/service-2.json
"""
client = boto3.client(service)
aws_operation_name = get_operation_name_in_keys(
to_upper_camel_case(operation),
2020-10-06 06:46:05 +00:00
list(client._service_model._service_description["operations"].keys()),
)
2017-09-19 19:36:11 +00:00
op_model = client._service_model.operation_model(aws_operation_name)
inputs = op_model.input_shape.members
2020-10-06 06:46:05 +00:00
if not hasattr(op_model.output_shape, "members"):
outputs = {}
else:
outputs = op_model.output_shape.members
2020-10-06 06:46:05 +00:00
input_names = [
to_snake_case(_) for _ in inputs.keys() if _ not in INPUT_IGNORED_IN_BACKEND
]
output_names = [
to_snake_case(_) for _ in outputs.keys() if _ not in OUTPUT_IGNORED_IN_BACKEND
]
2017-09-19 19:36:11 +00:00
if input_names:
2020-10-06 06:46:05 +00:00
body = "def {}(self, {}):\n".format(operation, ", ".join(input_names))
2017-09-19 19:36:11 +00:00
else:
2020-10-06 06:46:05 +00:00
body = "def {}(self)\n"
body += " # implement here\n"
body += " return {}\n\n".format(", ".join(output_names))
2017-09-19 19:36:11 +00:00
return body
def _get_subtree(name, shape, replace_list, name_prefix=None):
if not name_prefix:
name_prefix = []
2017-09-19 19:36:11 +00:00
class_name = shape.__class__.__name__
2020-10-06 06:46:05 +00:00
if class_name in ("StringShape", "Shape"):
tree = etree.Element(name)
2017-09-19 19:36:11 +00:00
if name_prefix:
tree.text = "{{ %s.%s }}" % (name_prefix[-1], to_snake_case(name))
2017-09-19 19:36:11 +00:00
else:
tree.text = "{{ %s }}" % to_snake_case(name)
return tree
if class_name in ("ListShape",):
2017-09-19 19:36:11 +00:00
replace_list.append((name, name_prefix))
tree = etree.Element(name)
2020-10-06 06:46:05 +00:00
t_member = etree.Element("member")
tree.append(t_member)
2017-09-19 19:36:11 +00:00
for nested_name, nested_shape in shape.member.members.items():
2020-10-06 06:46:05 +00:00
t_member.append(
_get_subtree(
nested_name,
nested_shape,
replace_list,
name_prefix + [singularize(name.lower())],
)
)
return tree
2020-10-06 06:46:05 +00:00
raise ValueError("Not supported Shape")
2017-09-19 19:36:11 +00:00
2017-09-21 12:23:13 +00:00
def get_response_query_template(service, operation):
2017-09-19 19:36:11 +00:00
"""refers to definition of API in botocore, and autogenerates template
2017-09-21 12:23:13 +00:00
Assume that response format is xml when protocol is query
2017-09-19 19:36:11 +00:00
You can see example of elbv2 from link below.
https://github.com/boto/botocore/blob/develop/botocore/data/elbv2/2015-12-01/service-2.json
"""
client = boto3.client(service)
aws_operation_name = get_operation_name_in_keys(
to_upper_camel_case(operation),
2020-10-06 06:46:05 +00:00
list(client._service_model._service_description["operations"].keys()),
)
2017-09-19 19:36:11 +00:00
op_model = client._service_model.operation_model(aws_operation_name)
2020-10-06 06:46:05 +00:00
result_wrapper = op_model.output_shape.serialization["resultWrapper"]
response_wrapper = result_wrapper.replace("Result", "Response")
2017-09-19 19:36:11 +00:00
metadata = op_model.metadata
2020-10-06 06:46:05 +00:00
xml_namespace = metadata["xmlNamespace"]
2017-09-19 19:36:11 +00:00
# build xml tree
2020-10-06 06:46:05 +00:00
t_root = etree.Element(response_wrapper, xmlns=xml_namespace)
2017-09-19 19:36:11 +00:00
# build metadata
2020-10-06 06:46:05 +00:00
t_metadata = etree.Element("ResponseMetadata")
t_request_id = etree.Element("RequestId")
t_request_id.text = "1549581b-12b7-11e3-895e-1334aEXAMPLE"
2017-09-19 19:36:11 +00:00
t_metadata.append(t_request_id)
t_root.append(t_metadata)
# build result
t_result = etree.Element(result_wrapper)
outputs = op_model.output_shape.members
replace_list = []
for output_name, output_shape in outputs.items():
t_result.append(_get_subtree(output_name, output_shape, replace_list))
t_root.append(t_result)
2020-10-06 06:46:05 +00:00
xml_body = etree.tostring(t_root, pretty_print=True).decode("utf-8")
xml_body_lines = xml_body.splitlines()
2017-09-19 19:36:11 +00:00
for replace in replace_list:
name = replace[0]
prefix = replace[1]
singular_name = singularize(name)
2020-10-06 06:46:05 +00:00
start_tag = "<%s>" % name
iter_name = "{}.{}".format(prefix[-1], name.lower()) if prefix else name.lower()
loop_start = "{%% for %s in %s %%}" % (singular_name.lower(), iter_name)
end_tag = "</%s>" % name
loop_end = "{{ endfor }}"
2017-09-21 12:23:13 +00:00
start_tag_indexes = [i for i, l in enumerate(xml_body_lines) if start_tag in l]
2017-09-21 12:23:13 +00:00
if len(start_tag_indexes) != 1:
2020-10-06 06:46:05 +00:00
raise Exception("tag %s not found in response body" % start_tag)
2017-09-21 12:23:13 +00:00
start_tag_index = start_tag_indexes[0]
xml_body_lines.insert(start_tag_index + 1, loop_start)
2017-09-21 12:23:13 +00:00
end_tag_indexes = [i for i, l in enumerate(xml_body_lines) if end_tag in l]
2017-09-21 12:23:13 +00:00
if len(end_tag_indexes) != 1:
2020-10-06 06:46:05 +00:00
raise Exception("tag %s not found in response body" % end_tag)
2017-09-21 12:23:13 +00:00
end_tag_index = end_tag_indexes[0]
xml_body_lines.insert(end_tag_index, loop_end)
2020-10-06 06:46:05 +00:00
xml_body = "\n".join(xml_body_lines)
body = '\n{}_TEMPLATE = """{}"""'.format(operation.upper(), xml_body)
2017-09-21 12:23:13 +00:00
return body
def insert_code_to_class(path, base_class, new_code):
with open(path) as fhandle:
lines = [_.replace("\n", "") for _ in fhandle.readlines()]
2020-10-06 06:46:05 +00:00
mod_path = os.path.splitext(path)[0].replace("/", ".")
mod = importlib.import_module(mod_path)
clsmembers = inspect.getmembers(mod, inspect.isclass)
2020-10-06 06:46:05 +00:00
_response_cls = [
_[1] for _ in clsmembers if issubclass(_[1], base_class) and _[1] != base_class
]
if len(_response_cls) != 1:
2020-10-06 06:46:05 +00:00
raise Exception("unknown error, number of clsmembers is not 1")
response_cls = _response_cls[0]
code_lines, line_no = inspect.getsourcelines(response_cls)
end_line_no = line_no + len(code_lines)
2020-10-06 06:46:05 +00:00
func_lines = [" " * 4 + _ for _ in new_code.splitlines()]
lines = lines[:end_line_no] + func_lines + lines[end_line_no:]
2020-10-06 06:46:05 +00:00
body = "\n".join(lines) + "\n"
with open(path, "w") as fhandle:
fhandle.write(body)
2020-10-06 06:46:05 +00:00
def insert_url(service, operation, api_protocol):
client = boto3.client(service)
service_class = client.__class__.__name__
aws_operation_name = get_operation_name_in_keys(
to_upper_camel_case(operation),
2020-10-06 06:46:05 +00:00
list(client._service_model._service_description["operations"].keys()),
)
2020-10-06 06:46:05 +00:00
uri = client._service_model.operation_model(aws_operation_name).http["requestUri"]
2020-10-06 06:46:05 +00:00
path = os.path.join(
os.path.dirname(__file__), "..", "moto", get_escaped_service(service), "urls.py"
)
with open(path) as fhandle:
lines = [_.replace("\n", "") for _ in fhandle.readlines()]
if any(_ for _ in lines if re.match(uri, _)):
return
url_paths_found = False
last_elem_line_index = -1
for i, line in enumerate(lines):
2020-10-06 06:46:05 +00:00
if line.startswith("url_paths"):
url_paths_found = True
2020-10-06 06:46:05 +00:00
if url_paths_found and line.startswith("}"):
last_elem_line_index = i - 1
prev_line = lines[last_elem_line_index]
2020-10-06 06:46:05 +00:00
if not prev_line.endswith("{") and not prev_line.endswith(","):
lines[last_elem_line_index] += ","
# generate url pattern
2020-10-06 06:46:05 +00:00
if api_protocol == "rest-json":
new_line = " '{0}/.*$': response.dispatch,"
else:
2020-10-06 06:46:05 +00:00
new_line = " '{0}%s$': %sResponse.dispatch," % (uri, service_class)
if new_line in lines:
return
lines.insert(last_elem_line_index + 1, new_line)
2020-10-06 06:46:05 +00:00
body = "\n".join(lines) + "\n"
with open(path, "w") as fhandle:
fhandle.write(body)
2020-10-06 06:46:05 +00:00
def insert_codes(service, operation, api_protocol):
func_in_responses = get_function_in_responses(service, operation, api_protocol)
func_in_models = get_function_in_models(service, operation)
# edit responses.py
2020-10-06 06:46:05 +00:00
responses_path = "moto/{}/responses.py".format(get_escaped_service(service))
print_progress("inserting code", responses_path, "green")
insert_code_to_class(responses_path, BaseResponse, func_in_responses)
# insert template
2020-10-06 06:46:05 +00:00
if api_protocol == "query":
template = get_response_query_template(service, operation)
with open(responses_path) as fhandle:
lines = [_[:-1] for _ in fhandle.readlines()]
lines += template.splitlines()
with open(responses_path, "w") as fhandle:
fhandle.write("\n".join(lines))
# edit models.py
2020-10-06 06:46:05 +00:00
models_path = "moto/{}/models.py".format(get_escaped_service(service))
print_progress("inserting code", models_path, "green")
insert_code_to_class(models_path, BaseBackend, func_in_models)
# edit urls.py
insert_url(service, operation, api_protocol)
@click.command()
def main():
service, operation = select_service_and_operation()
2020-10-06 06:46:05 +00:00
api_protocol = boto3.client(service)._service_model.metadata["protocol"]
initialize_service(service, api_protocol)
2020-10-06 06:46:05 +00:00
if api_protocol in ["query", "json", "rest-json"]:
insert_codes(service, operation, api_protocol)
else:
2020-10-06 06:46:05 +00:00
print_progress(
"skip inserting code",
'api protocol "{}" is not supported'.format(api_protocol),
"yellow",
)
2017-09-19 19:36:11 +00:00
click.echo(
'You will still need to add the mock into "docs/index.rst" and '
'"IMPLEMENTATION_COVERAGE.md"'
)
2017-09-26 16:33:19 +00:00
2020-10-06 06:46:05 +00:00
if __name__ == "__main__":
main()