import json import sys from typing import Any, Dict, List, Tuple, Union from urllib.parse import unquote from moto.core.utils import path_url from moto.utilities.aws_headers import amz_crc32, amzn_request_id from moto.core.responses import BaseResponse, TYPE_RESPONSE from .exceptions import FunctionAlreadyExists, UnknownFunctionException from .models import lambda_backends, LambdaBackend class LambdaResponse(BaseResponse): def __init__(self) -> None: super().__init__(service_name="awslambda") @property def json_body(self) -> Dict[str, Any]: # type: ignore[misc] return json.loads(self.body) @property def backend(self) -> LambdaBackend: return lambda_backends[self.current_account][self.region] def root(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "GET": return self._list_functions() elif request.method == "POST": return self._create_function() else: raise ValueError("Cannot handle request") def event_source_mappings( self, request: Any, full_url: str, headers: Any ) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "GET": querystring = self.querystring event_source_arn = querystring.get("EventSourceArn", [None])[0] function_name = querystring.get("FunctionName", [None])[0] return self._list_event_source_mappings(event_source_arn, function_name) elif request.method == "POST": return self._create_event_source_mapping() else: raise ValueError("Cannot handle request") def aliases(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "POST": return self._create_alias() elif request.method == "GET": path = request.path if hasattr(request, "path") else path_url(request.url) function_name = path.split("/")[-2] return self._list_aliases(function_name) def alias(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "DELETE": return self._delete_alias() elif request.method == "GET": return self._get_alias() elif request.method == "PUT": return self._update_alias() def event_source_mapping( self, request: Any, full_url: str, headers: Any ) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) path = request.path if hasattr(request, "path") else path_url(request.url) uuid = path.split("/")[-1] if request.method == "GET": return self._get_event_source_mapping(uuid) elif request.method == "PUT": return self._update_event_source_mapping(uuid) elif request.method == "DELETE": return self._delete_event_source_mapping(uuid) else: raise ValueError("Cannot handle request") def list_layers(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "GET": return self._list_layers() def layers_version(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) layer_name = unquote(self.path.split("/")[-3]) layer_version = self.path.split("/")[-1] if request.method == "DELETE": return self._delete_layer_version(layer_name, layer_version) elif request.method == "GET": return self._get_layer_version(layer_name, layer_version) def layers_versions(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "GET": return self._get_layer_versions() if request.method == "POST": return self._publish_layer_version() def function(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "GET": return self._get_function() elif request.method == "DELETE": return self._delete_function() else: raise ValueError("Cannot handle request") def versions(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "GET": # This is ListVersionByFunction path = request.path if hasattr(request, "path") else path_url(request.url) function_name = path.split("/")[-2] return self._list_versions_by_function(function_name) elif request.method == "POST": return self._publish_function() else: raise ValueError("Cannot handle request") @amz_crc32 @amzn_request_id def invoke( self, request: Any, full_url: str, headers: Any ) -> Tuple[int, Dict[str, str], Union[str, bytes]]: self.setup_class(request, full_url, headers) if request.method == "POST": return self._invoke(request) else: raise ValueError("Cannot handle request") @amz_crc32 @amzn_request_id def invoke_async( self, request: Any, full_url: str, headers: Any ) -> Tuple[int, Dict[str, str], Union[str, bytes]]: self.setup_class(request, full_url, headers) if request.method == "POST": return self._invoke_async() else: raise ValueError("Cannot handle request") def tag(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "GET": return self._list_tags() elif request.method == "POST": return self._tag_resource() elif request.method == "DELETE": return self._untag_resource() else: raise ValueError(f"Cannot handle {request.method} request") def policy(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "GET": return self._get_policy(request) elif request.method == "POST": return self._add_policy(request) elif request.method == "DELETE": return self._del_policy(request, self.querystring) else: raise ValueError(f"Cannot handle {request.method} request") def configuration(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "PUT": return self._put_configuration() if request.method == "GET": return self._get_function_configuration() else: raise ValueError("Cannot handle request") def code(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: self.setup_class(request, full_url, headers) if request.method == "PUT": return self._put_code() else: raise ValueError("Cannot handle request") def code_signing_config(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] self.setup_class(request, full_url, headers) if request.method == "GET": return self._get_code_signing_config() def function_concurrency( self, request: Any, full_url: str, headers: Any ) -> TYPE_RESPONSE: http_method = request.method self.setup_class(request, full_url, headers) if http_method == "GET": return self._get_function_concurrency() elif http_method == "DELETE": return self._delete_function_concurrency() elif http_method == "PUT": return self._put_function_concurrency() else: raise ValueError("Cannot handle request") def function_url_config(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] http_method = request.method self.setup_class(request, full_url, headers) if http_method == "DELETE": return self._delete_function_url_config() elif http_method == "GET": return self._get_function_url_config() elif http_method == "POST": return self._create_function_url_config() elif http_method == "PUT": return self._update_function_url_config() def _add_policy(self, request: Any) -> TYPE_RESPONSE: path = request.path if hasattr(request, "path") else path_url(request.url) function_name = unquote(path.split("/")[-2]) qualifier = self.querystring.get("Qualifier", [None])[0] statement = self.body statement = self.backend.add_permission(function_name, qualifier, statement) return 200, {}, json.dumps({"Statement": json.dumps(statement)}) def _get_policy(self, request: Any) -> TYPE_RESPONSE: path = request.path if hasattr(request, "path") else path_url(request.url) function_name = unquote(path.split("/")[-2]) out = self.backend.get_policy(function_name) return 200, {}, out def _del_policy(self, request: Any, querystring: Dict[str, Any]) -> TYPE_RESPONSE: path = request.path if hasattr(request, "path") else path_url(request.url) function_name = unquote(path.split("/")[-3]) statement_id = path.split("/")[-1].split("?")[0] revision = querystring.get("RevisionId", "") if self.backend.get_function(function_name): self.backend.remove_permission(function_name, statement_id, revision) return 204, {}, "{}" else: return 404, {}, "{}" def _invoke(self, request: Any) -> Tuple[int, Dict[str, str], Union[str, bytes]]: response_headers: Dict[str, str] = {} # URL Decode in case it's a ARN: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("qualifier") payload = self.backend.invoke( function_name, qualifier, self.body, self.headers, response_headers ) if payload: if request.headers.get("X-Amz-Invocation-Type") != "Event": if sys.getsizeof(payload) > 6000000: response_headers["Content-Length"] = "142" response_headers["x-amz-function-error"] = "Unhandled" error_dict = { "errorMessage": "Response payload size exceeded maximum allowed payload size (6291556 bytes).", "errorType": "Function.ResponseSizeTooLarge", } payload = json.dumps(error_dict).encode("utf-8") response_headers["content-type"] = "application/json" if request.headers.get("X-Amz-Invocation-Type") == "Event": status_code = 202 elif request.headers.get("X-Amz-Invocation-Type") == "DryRun": status_code = 204 else: if request.headers.get("X-Amz-Log-Type") != "Tail": del response_headers["x-amz-log-result"] status_code = 200 return status_code, response_headers, payload else: return 404, response_headers, "{}" def _invoke_async(self) -> Tuple[int, Dict[str, str], Union[str, bytes]]: response_headers: Dict[str, Any] = {} function_name = unquote(self.path.rsplit("/", 3)[-3]) fn = self.backend.get_function(function_name, None) payload = fn.invoke(self.body, self.headers, response_headers) response_headers["Content-Length"] = str(len(payload)) return 202, response_headers, payload def _list_functions(self) -> TYPE_RESPONSE: querystring = self.querystring func_version = querystring.get("FunctionVersion", [None])[0] result: Dict[str, List[Dict[str, Any]]] = {"Functions": []} for fn in self.backend.list_functions(func_version): json_data = fn.get_configuration() result["Functions"].append(json_data) return 200, {}, json.dumps(result) def _list_versions_by_function(self, function_name: str) -> TYPE_RESPONSE: result: Dict[str, Any] = {"Versions": []} functions = self.backend.list_versions_by_function(function_name) for fn in functions: json_data = fn.get_configuration() result["Versions"].append(json_data) return 200, {}, json.dumps(result) def _list_aliases(self, function_name: str) -> TYPE_RESPONSE: result: Dict[str, Any] = {"Aliases": []} aliases = self.backend.list_aliases(function_name) for alias in aliases: json_data = alias.to_json() result["Aliases"].append(json_data) return 200, {}, json.dumps(result) def _create_function(self) -> TYPE_RESPONSE: function_name = self.json_body["FunctionName"].rsplit(":", 1)[-1] try: self.backend.get_function(function_name, None) except UnknownFunctionException: fn = self.backend.create_function(self.json_body) config = fn.get_configuration(on_create=True) return 201, {}, json.dumps(config) raise FunctionAlreadyExists(function_name) def _create_function_url_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) config = self.backend.create_function_url_config(function_name, self.json_body) return 201, {}, json.dumps(config.to_dict()) def _delete_function_url_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) self.backend.delete_function_url_config(function_name) return 204, {}, "{}" def _get_function_url_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) config = self.backend.get_function_url_config(function_name) return 201, {}, json.dumps(config.to_dict()) def _update_function_url_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) config = self.backend.update_function_url_config(function_name, self.json_body) return 200, {}, json.dumps(config.to_dict()) def _create_event_source_mapping(self) -> TYPE_RESPONSE: fn = self.backend.create_event_source_mapping(self.json_body) config = fn.get_configuration() return 201, {}, json.dumps(config) def _list_event_source_mappings( self, event_source_arn: str, function_name: str ) -> TYPE_RESPONSE: esms = self.backend.list_event_source_mappings(event_source_arn, function_name) result = {"EventSourceMappings": [esm.get_configuration() for esm in esms]} return 200, {}, json.dumps(result) def _get_event_source_mapping(self, uuid: str) -> TYPE_RESPONSE: result = self.backend.get_event_source_mapping(uuid) if result: return 200, {}, json.dumps(result.get_configuration()) else: return 404, {}, "{}" def _update_event_source_mapping(self, uuid: str) -> TYPE_RESPONSE: result = self.backend.update_event_source_mapping(uuid, self.json_body) if result: return 202, {}, json.dumps(result.get_configuration()) else: return 404, {}, "{}" def _delete_event_source_mapping(self, uuid: str) -> TYPE_RESPONSE: esm = self.backend.delete_event_source_mapping(uuid) if esm: json_result = esm.get_configuration() json_result.update({"State": "Deleting"}) return 202, {}, json.dumps(json_result) else: return 404, {}, "{}" def _publish_function(self) -> TYPE_RESPONSE: function_name = unquote(self.path.split("/")[-2]) description = self._get_param("Description") fn = self.backend.publish_function(function_name, description) config = fn.get_configuration() # type: ignore[union-attr] return 201, {}, json.dumps(config) def _delete_function(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 1)[-1]) qualifier = self._get_param("Qualifier", None) self.backend.delete_function(function_name, qualifier) return 204, {}, "" @staticmethod def _set_configuration_qualifier(configuration: Dict[str, Any], qualifier: str) -> Dict[str, Any]: # type: ignore[misc] if qualifier is None or qualifier == "$LATEST": configuration["Version"] = "$LATEST" if qualifier == "$LATEST": configuration["FunctionArn"] += ":$LATEST" return configuration def _get_function(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 1)[-1]) qualifier = self._get_param("Qualifier", None) fn = self.backend.get_function(function_name, qualifier) code = fn.get_code() code["Configuration"] = self._set_configuration_qualifier( code["Configuration"], qualifier ) return 200, {}, json.dumps(code) def _get_function_configuration(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("Qualifier", None) fn = self.backend.get_function(function_name, qualifier) configuration = self._set_configuration_qualifier( fn.get_configuration(), qualifier ) return 200, {}, json.dumps(configuration) def _get_aws_region(self, full_url: str) -> str: region = self.region_regex.search(full_url) if region: return region.group(1) else: return self.default_region def _list_tags(self) -> TYPE_RESPONSE: function_arn = unquote(self.path.rsplit("/", 1)[-1]) tags = self.backend.list_tags(function_arn) return 200, {}, json.dumps({"Tags": tags}) def _tag_resource(self) -> TYPE_RESPONSE: function_arn = unquote(self.path.rsplit("/", 1)[-1]) self.backend.tag_resource(function_arn, self.json_body["Tags"]) return 200, {}, "{}" def _untag_resource(self) -> TYPE_RESPONSE: function_arn = unquote(self.path.rsplit("/", 1)[-1]) tag_keys = self.querystring["tagKeys"] self.backend.untag_resource(function_arn, tag_keys) return 204, {}, "{}" def _put_configuration(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("Qualifier", None) resp = self.backend.update_function_configuration( function_name, qualifier, body=self.json_body ) if resp: return 200, {}, json.dumps(resp) else: return 404, {}, "{}" def _put_code(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) qualifier = self._get_param("Qualifier", None) resp = self.backend.update_function_code( function_name, qualifier, body=self.json_body ) if resp: return 200, {}, json.dumps(resp) else: return 404, {}, "{}" def _get_code_signing_config(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) resp = self.backend.get_code_signing_config(function_name) return 200, {}, json.dumps(resp) def _get_function_concurrency(self) -> TYPE_RESPONSE: path_function_name = unquote(self.path.rsplit("/", 2)[-2]) function_name = self.backend.get_function(path_function_name) if function_name is None: return 404, {}, "{}" resp = self.backend.get_function_concurrency(path_function_name) return 200, {}, json.dumps({"ReservedConcurrentExecutions": resp}) def _delete_function_concurrency(self) -> TYPE_RESPONSE: path_function_name = unquote(self.path.rsplit("/", 2)[-2]) function_name = self.backend.get_function(path_function_name) if function_name is None: return 404, {}, "{}" self.backend.delete_function_concurrency(path_function_name) return 204, {}, "{}" def _put_function_concurrency(self) -> TYPE_RESPONSE: path_function_name = unquote(self.path.rsplit("/", 2)[-2]) function = self.backend.get_function(path_function_name) if function is None: return 404, {}, "{}" concurrency = self._get_param("ReservedConcurrentExecutions", None) resp = self.backend.put_function_concurrency(path_function_name, concurrency) return 200, {}, json.dumps({"ReservedConcurrentExecutions": resp}) def _list_layers(self) -> TYPE_RESPONSE: layers = self.backend.list_layers() return 200, {}, json.dumps({"Layers": layers}) def _delete_layer_version( self, layer_name: str, layer_version: str ) -> TYPE_RESPONSE: self.backend.delete_layer_version(layer_name, layer_version) return 200, {}, "{}" def _get_layer_version(self, layer_name: str, layer_version: str) -> TYPE_RESPONSE: layer = self.backend.get_layer_version(layer_name, layer_version) return 200, {}, json.dumps(layer.get_layer_version()) def _get_layer_versions(self) -> TYPE_RESPONSE: layer_name = self.path.rsplit("/", 2)[-2] layer_versions = self.backend.get_layer_versions(layer_name) return ( 200, {}, json.dumps( {"LayerVersions": [lv.get_layer_version() for lv in layer_versions]} ), ) def _publish_layer_version(self) -> TYPE_RESPONSE: spec = self.json_body if "LayerName" not in spec: spec["LayerName"] = self.path.rsplit("/", 2)[-2] layer_version = self.backend.publish_layer_version(spec) config = layer_version.get_layer_version() return 201, {}, json.dumps(config) def _create_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/", 2)[-2]) params = json.loads(self.body) alias_name = params.get("Name") description = params.get("Description", "") function_version = params.get("FunctionVersion") routing_config = params.get("RoutingConfig") alias = self.backend.create_alias( name=alias_name, function_name=function_name, function_version=function_version, description=description, routing_config=routing_config, ) return 201, {}, json.dumps(alias.to_json()) def _delete_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/")[-3]) alias_name = unquote(self.path.rsplit("/", 2)[-1]) self.backend.delete_alias(name=alias_name, function_name=function_name) return 201, {}, "{}" def _get_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/")[-3]) alias_name = unquote(self.path.rsplit("/", 2)[-1]) alias = self.backend.get_alias(name=alias_name, function_name=function_name) return 201, {}, json.dumps(alias.to_json()) def _update_alias(self) -> TYPE_RESPONSE: function_name = unquote(self.path.rsplit("/")[-3]) alias_name = unquote(self.path.rsplit("/", 2)[-1]) params = json.loads(self.body) description = params.get("Description") function_version = params.get("FunctionVersion") routing_config = params.get("RoutingConfig") alias = self.backend.update_alias( name=alias_name, function_name=function_name, function_version=function_version, description=description, routing_config=routing_config, ) return 201, {}, json.dumps(alias.to_json())