moto/moto/apigateway/responses.py
2020-04-25 03:13:36 -05:00

665 lines
26 KiB
Python

from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import apigateway_backends
from .exceptions import (
ApiKeyNotFoundException,
BadRequestException,
CrossAccountNotAllowed,
AuthorizerNotFoundException,
StageNotFoundException,
ApiKeyAlreadyExists,
DomainNameNotFound,
InvalidDomainName,
InvalidRestApiId,
InvalidModelName,
RestAPINotFound,
ModelNotFound,
)
API_KEY_SOURCES = ["AUTHORIZER", "HEADER"]
AUTHORIZER_TYPES = ["TOKEN", "REQUEST", "COGNITO_USER_POOLS"]
ENDPOINT_CONFIGURATION_TYPES = ["PRIVATE", "EDGE", "REGIONAL"]
class APIGatewayResponse(BaseResponse):
def error(self, type_, message, status=400):
return (
status,
self.response_headers,
json.dumps({"__type": type_, "message": message}),
)
def _get_param(self, key):
return json.loads(self.body).get(key) if self.body else None
def _get_param_with_default_value(self, key, default):
jsonbody = json.loads(self.body)
if key in jsonbody:
return jsonbody.get(key)
else:
return default
@property
def backend(self):
return apigateway_backends[self.region]
def restapis(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if self.method == "GET":
apis = self.backend.list_apis()
return 200, {}, json.dumps({"item": [api.to_dict() for api in apis]})
elif self.method == "POST":
name = self._get_param("name")
description = self._get_param("description")
api_key_source = self._get_param("apiKeySource")
endpoint_configuration = self._get_param("endpointConfiguration")
tags = self._get_param("tags")
policy = self._get_param("policy")
# Param validation
if api_key_source and api_key_source not in API_KEY_SOURCES:
return self.error(
"ValidationException",
(
"1 validation error detected: "
"Value '{api_key_source}' at 'createRestApiInput.apiKeySource' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"[AUTHORIZER, HEADER]"
).format(api_key_source=api_key_source),
)
if endpoint_configuration and "types" in endpoint_configuration:
invalid_types = list(
set(endpoint_configuration["types"])
- set(ENDPOINT_CONFIGURATION_TYPES)
)
if invalid_types:
return self.error(
"ValidationException",
(
"1 validation error detected: Value '{endpoint_type}' "
"at 'createRestApiInput.endpointConfiguration.types' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"[PRIVATE, EDGE, REGIONAL]"
).format(endpoint_type=invalid_types[0]),
)
rest_api = self.backend.create_rest_api(
name,
description,
api_key_source=api_key_source,
endpoint_configuration=endpoint_configuration,
tags=tags,
policy=policy,
)
return 200, {}, json.dumps(rest_api.to_dict())
def restapis_individual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
function_id = self.path.replace("/restapis/", "", 1).split("/")[0]
if self.method == "GET":
rest_api = self.backend.get_rest_api(function_id)
return 200, {}, json.dumps(rest_api.to_dict())
elif self.method == "DELETE":
rest_api = self.backend.delete_rest_api(function_id)
return 200, {}, json.dumps(rest_api.to_dict())
def resources(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
function_id = self.path.replace("/restapis/", "", 1).split("/")[0]
if self.method == "GET":
resources = self.backend.list_resources(function_id)
return (
200,
{},
json.dumps({"item": [resource.to_dict() for resource in resources]}),
)
def resource_individual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
function_id = self.path.replace("/restapis/", "", 1).split("/")[0]
resource_id = self.path.split("/")[-1]
try:
if self.method == "GET":
resource = self.backend.get_resource(function_id, resource_id)
elif self.method == "POST":
path_part = self._get_param("pathPart")
resource = self.backend.create_resource(
function_id, resource_id, path_part
)
elif self.method == "DELETE":
resource = self.backend.delete_resource(function_id, resource_id)
return 200, {}, json.dumps(resource.to_dict())
except BadRequestException as e:
return self.error(
"com.amazonaws.dynamodb.v20111205#BadRequestException", e.message
)
def resource_methods(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
function_id = url_path_parts[2]
resource_id = url_path_parts[4]
method_type = url_path_parts[6]
if self.method == "GET":
method = self.backend.get_method(function_id, resource_id, method_type)
return 200, {}, json.dumps(method)
elif self.method == "PUT":
authorization_type = self._get_param("authorizationType")
api_key_required = self._get_param("apiKeyRequired")
method = self.backend.create_method(
function_id,
resource_id,
method_type,
authorization_type,
api_key_required,
)
return 200, {}, json.dumps(method)
def resource_method_responses(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
function_id = url_path_parts[2]
resource_id = url_path_parts[4]
method_type = url_path_parts[6]
response_code = url_path_parts[8]
if self.method == "GET":
method_response = self.backend.get_method_response(
function_id, resource_id, method_type, response_code
)
elif self.method == "PUT":
method_response = self.backend.create_method_response(
function_id, resource_id, method_type, response_code
)
elif self.method == "DELETE":
method_response = self.backend.delete_method_response(
function_id, resource_id, method_type, response_code
)
return 200, {}, json.dumps(method_response)
def restapis_authorizers(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
restapi_id = url_path_parts[2]
if self.method == "POST":
name = self._get_param("name")
authorizer_type = self._get_param("type")
provider_arns = self._get_param_with_default_value("providerARNs", None)
auth_type = self._get_param_with_default_value("authType", None)
authorizer_uri = self._get_param_with_default_value("authorizerUri", None)
authorizer_credentials = self._get_param_with_default_value(
"authorizerCredentials", None
)
identity_source = self._get_param_with_default_value("identitySource", None)
identiy_validation_expression = self._get_param_with_default_value(
"identityValidationExpression", None
)
authorizer_result_ttl = self._get_param_with_default_value(
"authorizerResultTtlInSeconds", 300
)
# Param validation
if authorizer_type and authorizer_type not in AUTHORIZER_TYPES:
return self.error(
"ValidationException",
(
"1 validation error detected: "
"Value '{authorizer_type}' at 'createAuthorizerInput.type' failed "
"to satisfy constraint: Member must satisfy enum value set: "
"[TOKEN, REQUEST, COGNITO_USER_POOLS]"
).format(authorizer_type=authorizer_type),
)
authorizer_response = self.backend.create_authorizer(
restapi_id,
name,
authorizer_type,
provider_arns=provider_arns,
auth_type=auth_type,
authorizer_uri=authorizer_uri,
authorizer_credentials=authorizer_credentials,
identity_source=identity_source,
identiy_validation_expression=identiy_validation_expression,
authorizer_result_ttl=authorizer_result_ttl,
)
elif self.method == "GET":
authorizers = self.backend.get_authorizers(restapi_id)
return 200, {}, json.dumps({"item": authorizers})
return 200, {}, json.dumps(authorizer_response)
def authorizers(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
restapi_id = url_path_parts[2]
authorizer_id = url_path_parts[4]
if self.method == "GET":
try:
authorizer_response = self.backend.get_authorizer(
restapi_id, authorizer_id
)
except AuthorizerNotFoundException as error:
return (
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
elif self.method == "PATCH":
patch_operations = self._get_param("patchOperations")
authorizer_response = self.backend.update_authorizer(
restapi_id, authorizer_id, patch_operations
)
elif self.method == "DELETE":
self.backend.delete_authorizer(restapi_id, authorizer_id)
return 202, {}, "{}"
return 200, {}, json.dumps(authorizer_response)
def restapis_stages(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
function_id = url_path_parts[2]
if self.method == "POST":
stage_name = self._get_param("stageName")
deployment_id = self._get_param("deploymentId")
stage_variables = self._get_param_with_default_value("variables", {})
description = self._get_param_with_default_value("description", "")
cacheClusterEnabled = self._get_param_with_default_value(
"cacheClusterEnabled", False
)
cacheClusterSize = self._get_param_with_default_value(
"cacheClusterSize", None
)
stage_response = self.backend.create_stage(
function_id,
stage_name,
deployment_id,
variables=stage_variables,
description=description,
cacheClusterEnabled=cacheClusterEnabled,
cacheClusterSize=cacheClusterSize,
)
elif self.method == "GET":
stages = self.backend.get_stages(function_id)
return 200, {}, json.dumps({"item": stages})
return 200, {}, json.dumps(stage_response)
def stages(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
function_id = url_path_parts[2]
stage_name = url_path_parts[4]
if self.method == "GET":
try:
stage_response = self.backend.get_stage(function_id, stage_name)
except StageNotFoundException as error:
return (
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
elif self.method == "PATCH":
patch_operations = self._get_param("patchOperations")
stage_response = self.backend.update_stage(
function_id, stage_name, patch_operations
)
elif self.method == "DELETE":
self.backend.delete_stage(function_id, stage_name)
return 202, {}, "{}"
return 200, {}, json.dumps(stage_response)
def integrations(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
function_id = url_path_parts[2]
resource_id = url_path_parts[4]
method_type = url_path_parts[6]
try:
if self.method == "GET":
integration_response = self.backend.get_integration(
function_id, resource_id, method_type
)
elif self.method == "PUT":
integration_type = self._get_param("type")
uri = self._get_param("uri")
integration_http_method = self._get_param("httpMethod")
creds = self._get_param("credentials")
request_templates = self._get_param("requestTemplates")
integration_response = self.backend.create_integration(
function_id,
resource_id,
method_type,
integration_type,
uri,
credentials=creds,
integration_method=integration_http_method,
request_templates=request_templates,
)
elif self.method == "DELETE":
integration_response = self.backend.delete_integration(
function_id, resource_id, method_type
)
return 200, {}, json.dumps(integration_response)
except BadRequestException as e:
return self.error(
"com.amazonaws.dynamodb.v20111205#BadRequestException", e.message
)
except CrossAccountNotAllowed as e:
return self.error(
"com.amazonaws.dynamodb.v20111205#AccessDeniedException", e.message
)
def integration_responses(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
function_id = url_path_parts[2]
resource_id = url_path_parts[4]
method_type = url_path_parts[6]
status_code = url_path_parts[9]
try:
if self.method == "GET":
integration_response = self.backend.get_integration_response(
function_id, resource_id, method_type, status_code
)
elif self.method == "PUT":
selection_pattern = self._get_param("selectionPattern")
response_templates = self._get_param("responseTemplates")
integration_response = self.backend.create_integration_response(
function_id,
resource_id,
method_type,
status_code,
selection_pattern,
response_templates,
)
elif self.method == "DELETE":
integration_response = self.backend.delete_integration_response(
function_id, resource_id, method_type, status_code
)
return 200, {}, json.dumps(integration_response)
except BadRequestException as e:
return self.error(
"com.amazonaws.dynamodb.v20111205#BadRequestException", e.message
)
def deployments(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
function_id = self.path.replace("/restapis/", "", 1).split("/")[0]
try:
if self.method == "GET":
deployments = self.backend.get_deployments(function_id)
return 200, {}, json.dumps({"item": deployments})
elif self.method == "POST":
name = self._get_param("stageName")
description = self._get_param_with_default_value("description", "")
stage_variables = self._get_param_with_default_value("variables", {})
deployment = self.backend.create_deployment(
function_id, name, description, stage_variables
)
return 200, {}, json.dumps(deployment)
except BadRequestException as e:
return self.error(
"com.amazonaws.dynamodb.v20111205#BadRequestException", e.message
)
def individual_deployment(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
function_id = url_path_parts[2]
deployment_id = url_path_parts[4]
if self.method == "GET":
deployment = self.backend.get_deployment(function_id, deployment_id)
elif self.method == "DELETE":
deployment = self.backend.delete_deployment(function_id, deployment_id)
return 200, {}, json.dumps(deployment)
def apikeys(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if self.method == "POST":
try:
apikey_response = self.backend.create_apikey(json.loads(self.body))
except ApiKeyAlreadyExists as error:
return (
error.code,
self.headers,
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
elif self.method == "GET":
apikeys_response = self.backend.get_apikeys()
return 200, {}, json.dumps({"item": apikeys_response})
return 200, {}, json.dumps(apikey_response)
def apikey_individual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
apikey = url_path_parts[2]
if self.method == "GET":
apikey_response = self.backend.get_apikey(apikey)
elif self.method == "PATCH":
patch_operations = self._get_param("patchOperations")
apikey_response = self.backend.update_apikey(apikey, patch_operations)
elif self.method == "DELETE":
apikey_response = self.backend.delete_apikey(apikey)
return 200, {}, json.dumps(apikey_response)
def usage_plans(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if self.method == "POST":
usage_plan_response = self.backend.create_usage_plan(json.loads(self.body))
elif self.method == "GET":
api_key_id = self.querystring.get("keyId", [None])[0]
usage_plans_response = self.backend.get_usage_plans(api_key_id=api_key_id)
return 200, {}, json.dumps({"item": usage_plans_response})
return 200, {}, json.dumps(usage_plan_response)
def usage_plan_individual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
usage_plan = url_path_parts[2]
if self.method == "GET":
usage_plan_response = self.backend.get_usage_plan(usage_plan)
elif self.method == "DELETE":
usage_plan_response = self.backend.delete_usage_plan(usage_plan)
return 200, {}, json.dumps(usage_plan_response)
def usage_plan_keys(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
usage_plan_id = url_path_parts[2]
if self.method == "POST":
try:
usage_plan_response = self.backend.create_usage_plan_key(
usage_plan_id, json.loads(self.body)
)
except ApiKeyNotFoundException as error:
return (
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
elif self.method == "GET":
usage_plans_response = self.backend.get_usage_plan_keys(usage_plan_id)
return 200, {}, json.dumps({"item": usage_plans_response})
return 200, {}, json.dumps(usage_plan_response)
def usage_plan_key_individual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
usage_plan_id = url_path_parts[2]
key_id = url_path_parts[4]
if self.method == "GET":
usage_plan_response = self.backend.get_usage_plan_key(usage_plan_id, key_id)
elif self.method == "DELETE":
usage_plan_response = self.backend.delete_usage_plan_key(
usage_plan_id, key_id
)
return 200, {}, json.dumps(usage_plan_response)
def domain_names(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
try:
if self.method == "GET":
domain_names = self.backend.get_domain_names()
return 200, {}, json.dumps({"item": domain_names})
elif self.method == "POST":
domain_name = self._get_param("domainName")
certificate_name = self._get_param("certificateName")
tags = self._get_param("tags")
certificate_arn = self._get_param("certificateArn")
certificate_body = self._get_param("certificateBody")
certificate_private_key = self._get_param("certificatePrivateKey")
certificate_chain = self._get_param("certificateChain")
regional_certificate_name = self._get_param("regionalCertificateName")
regional_certificate_arn = self._get_param("regionalCertificateArn")
endpoint_configuration = self._get_param("endpointConfiguration")
security_policy = self._get_param("securityPolicy")
generate_cli_skeleton = self._get_param("generateCliSkeleton")
domain_name_resp = self.backend.create_domain_name(
domain_name,
certificate_name,
tags,
certificate_arn,
certificate_body,
certificate_private_key,
certificate_chain,
regional_certificate_name,
regional_certificate_arn,
endpoint_configuration,
security_policy,
generate_cli_skeleton,
)
return 200, {}, json.dumps(domain_name_resp)
except InvalidDomainName as error:
return (
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
def domain_name_induvidual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
domain_name = url_path_parts[2]
domain_names = {}
try:
if self.method == "GET":
if domain_name is not None:
domain_names = self.backend.get_domain_name(domain_name)
return 200, {}, json.dumps(domain_names)
except DomainNameNotFound as error:
return (
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
def models(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
rest_api_id = self.path.replace("/restapis/", "", 1).split("/")[0]
try:
if self.method == "GET":
models = self.backend.get_models(rest_api_id)
return 200, {}, json.dumps({"item": models})
elif self.method == "POST":
name = self._get_param("name")
description = self._get_param("description")
schema = self._get_param("schema")
content_type = self._get_param("contentType")
cli_input_json = self._get_param("cliInputJson")
generate_cli_skeleton = self._get_param("generateCliSkeleton")
model = self.backend.create_model(
rest_api_id,
name,
content_type,
description,
schema,
cli_input_json,
generate_cli_skeleton,
)
return 200, {}, json.dumps(model)
except (InvalidRestApiId, InvalidModelName, RestAPINotFound) as error:
return (
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
def model_induvidual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
rest_api_id = url_path_parts[2]
model_name = url_path_parts[4]
model_info = {}
try:
if self.method == "GET":
model_info = self.backend.get_model(rest_api_id, model_name)
return 200, {}, json.dumps(model_info)
except (
ModelNotFound,
RestAPINotFound,
InvalidRestApiId,
InvalidModelName,
) as error:
return (
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)