Lambda: delete_layer_version() now also accepts an ARN (#6409)

This commit is contained in:
Bert Blommers 2023-06-15 11:05:06 +00:00 committed by GitHub
parent 0247719554
commit 932b7a25d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 48 additions and 24 deletions

View File

@ -1,4 +1,5 @@
from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
from typing import Any
class LambdaClientError(JsonRESTError): class LambdaClientError(JsonRESTError):
@ -63,6 +64,16 @@ class UnknownLayerException(LambdaClientError):
super().__init__("ResourceNotFoundException", "Cannot find layer") super().__init__("ResourceNotFoundException", "Cannot find layer")
class UnknownLayerVersionException(LambdaClientError):
code = 404
def __init__(self, arns: Any) -> None:
super().__init__(
"ResourceNotFoundException",
f"One or more LayerVersion does not exist {arns}",
)
class UnknownPolicyException(LambdaClientError): class UnknownPolicyException(LambdaClientError):
code = 404 code = 404

View File

@ -41,6 +41,7 @@ from .exceptions import (
InvalidRoleFormat, InvalidRoleFormat,
InvalidParameterValueException, InvalidParameterValueException,
UnknownLayerException, UnknownLayerException,
UnknownLayerVersionException,
UnknownFunctionException, UnknownFunctionException,
UnknownAliasException, UnknownAliasException,
) )
@ -552,10 +553,7 @@ class LambdaFunction(CloudFormationModel, DockerModel):
for layer_version in layers_versions_arns for layer_version in layers_versions_arns
] ]
if not all(layer_versions): if not all(layer_versions):
raise ValueError( raise UnknownLayerVersionException(layers_versions_arns)
"InvalidParameterValueException",
f"One or more LayerVersion does not exist {layers_versions_arns}",
)
return [{"Arn": lv.arn, "CodeSize": lv.code_size} for lv in layer_versions] return [{"Arn": lv.arn, "CodeSize": lv.code_size} for lv in layer_versions]
def get_code_signing_config(self) -> Dict[str, Any]: def get_code_signing_config(self) -> Dict[str, Any]:
@ -1409,6 +1407,14 @@ class LayerStorage(object):
str, LambdaFunction str, LambdaFunction
] = weakref.WeakValueDictionary() ] = weakref.WeakValueDictionary()
def _find_layer_by_name_or_arn(self, name_or_arn: str) -> Layer:
if name_or_arn in self._layers:
return self._layers[name_or_arn]
for layer in self._layers.values():
if layer.layer_arn == name_or_arn:
return layer
raise UnknownLayerException()
def put_layer_version(self, layer_version: LayerVersion) -> None: def put_layer_version(self, layer_version: LayerVersion) -> None:
""" """
:param layer_version: LayerVersion :param layer_version: LayerVersion
@ -1423,12 +1429,12 @@ class LayerStorage(object):
] ]
def delete_layer_version(self, layer_name: str, layer_version: str) -> None: def delete_layer_version(self, layer_name: str, layer_version: str) -> None:
self._layers[layer_name].delete_version(layer_version) layer = self._find_layer_by_name_or_arn(layer_name)
layer.delete_version(layer_version)
def get_layer_version(self, layer_name: str, layer_version: str) -> LayerVersion: def get_layer_version(self, layer_name: str, layer_version: str) -> LayerVersion:
if layer_name not in self._layers: layer = self._find_layer_by_name_or_arn(layer_name)
raise UnknownLayerException() for lv in layer.layer_versions.values():
for lv in self._layers[layer_name].layer_versions.values():
if lv.version == int(layer_version): if lv.version == int(layer_version):
return lv return lv
raise UnknownLayerException() raise UnknownLayerException()

View File

@ -80,10 +80,12 @@ class LambdaResponse(BaseResponse):
def layers_version(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def layers_version(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
layer_name = unquote(self.path.split("/")[-3])
layer_version = self.path.split("/")[-1]
if request.method == "DELETE": if request.method == "DELETE":
return self._delete_layer_version() return self._delete_layer_version(layer_name, layer_version)
elif request.method == "GET": elif request.method == "GET":
return self._get_layer_version() return self._get_layer_version(layer_name, layer_version)
def layers_versions(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return] def layers_versions(self, request: Any, full_url: str, headers: Any) -> TYPE_RESPONSE: # type: ignore[return]
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
@ -492,17 +494,13 @@ class LambdaResponse(BaseResponse):
layers = self.backend.list_layers() layers = self.backend.list_layers()
return 200, {}, json.dumps({"Layers": layers}) return 200, {}, json.dumps({"Layers": layers})
def _delete_layer_version(self) -> TYPE_RESPONSE: def _delete_layer_version(
layer_name = self.path.split("/")[-3] self, layer_name: str, layer_version: str
layer_version = self.path.split("/")[-1] ) -> TYPE_RESPONSE:
self.backend.delete_layer_version(layer_name, layer_version) self.backend.delete_layer_version(layer_name, layer_version)
return 200, {}, "{}" return 200, {}, "{}"
def _get_layer_version(self) -> TYPE_RESPONSE: def _get_layer_version(self, layer_name: str, layer_version: str) -> TYPE_RESPONSE:
layer_name = self.path.split("/")[-3]
layer_version = self.path.split("/")[-1]
layer = self.backend.get_layer_version(layer_name, layer_version) layer = self.backend.get_layer_version(layer_name, layer_version)
return 200, {}, json.dumps(layer.get_layer_version()) return 200, {}, json.dumps(layer.get_layer_version())

View File

@ -27,7 +27,7 @@ url_paths = {
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/url/?$": response.function_url_config, r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/url/?$": response.function_url_config,
r"{0}/(?P<api_version>[^/]+)/layers$": response.list_layers, r"{0}/(?P<api_version>[^/]+)/layers$": response.list_layers,
r"{0}/(?P<api_version>[^/]+)/layers/$": response.list_layers, r"{0}/(?P<api_version>[^/]+)/layers/$": response.list_layers,
r"{0}/(?P<api_version>[^/]+)/layers/(?P<layer_name>[\w_-]+)/versions$": response.layers_versions, r"{0}/(?P<api_version>[^/]+)/layers/(?P<layer_name>.+)/versions$": response.layers_versions,
r"{0}/(?P<api_version>[^/]+)/layers/(?P<layer_name>[\w_-]+)/versions/$": response.layers_versions, r"{0}/(?P<api_version>[^/]+)/layers/(?P<layer_name>.+)/versions/$": response.layers_versions,
r"{0}/(?P<api_version>[^/]+)/layers/(?P<layer_name>[\w_-]+)/versions/(?P<layer_version>[\w_-]+)$": response.layers_version, r"{0}/(?P<api_version>[^/]+)/layers/(?P<layer_name>.+)/versions/(?P<layer_version>[\w_-]+)$": response.layers_version,
} }

View File

@ -115,7 +115,7 @@ def test_get_lambda_layers():
assert result["LayerVersions"] == [] assert result["LayerVersions"] == []
# Test create function with non existant layer version # Test create function with non existant layer version
with pytest.raises((ValueError, ClientError)): with pytest.raises(ClientError) as exc:
conn.create_function( conn.create_function(
FunctionName=function_name, FunctionName=function_name,
Runtime="python2.7", Runtime="python2.7",
@ -129,6 +129,8 @@ def test_get_lambda_layers():
Environment={"Variables": {"test_variable": "test_value"}}, Environment={"Variables": {"test_variable": "test_value"}},
Layers=[(expected_arn + "3")], Layers=[(expected_arn + "3")],
) )
err = exc.value.response["Error"]
assert err["Code"] == "ResourceNotFoundException"
@mock_lambda @mock_lambda
@ -200,7 +202,8 @@ def test_get_layer_version__unknown():
@mock_lambda @mock_lambda
@mock_s3 @mock_s3
def test_delete_layer_version(): @pytest.mark.parametrize("use_arn", [True, False])
def test_delete_layer_version(use_arn):
bucket_name = str(uuid4()) bucket_name = str(uuid4())
s3_conn = boto3.client("s3", _lambda_region) s3_conn = boto3.client("s3", _lambda_region)
s3_conn.create_bucket( s3_conn.create_bucket(
@ -219,9 +222,15 @@ def test_delete_layer_version():
CompatibleRuntimes=["python3.6"], CompatibleRuntimes=["python3.6"],
LicenseInfo="MIT", LicenseInfo="MIT",
) )
layer_arn = resp["LayerArn"]
layer_version = resp["Version"] layer_version = resp["Version"]
conn.delete_layer_version(LayerName=layer_name, VersionNumber=layer_version) if use_arn:
conn.get_layer_version(LayerName=layer_arn, VersionNumber=layer_version)
conn.delete_layer_version(LayerName=layer_arn, VersionNumber=layer_version)
else:
conn.get_layer_version(LayerName=layer_name, VersionNumber=layer_version)
conn.delete_layer_version(LayerName=layer_name, VersionNumber=layer_version)
result = conn.list_layer_versions(LayerName=layer_name)["LayerVersions"] result = conn.list_layer_versions(LayerName=layer_name)["LayerVersions"]
assert result == [] assert result == []