AWSLambda - refactor exception handling

This commit is contained in:
Bert Blommers 2019-11-17 10:59:20 +00:00
parent ad23b65784
commit c6dd3c96ea
4 changed files with 56 additions and 42 deletions

View File

@ -0,0 +1,31 @@
from botocore.client import ClientError
class LambdaClientError(ClientError):
def __init__(self, error, message):
error_response = {"Error": {"Code": error, "Message": message}}
super(LambdaClientError, self).__init__(error_response, None)
class CrossAccountNotAllowed(LambdaClientError):
def __init__(self):
super(CrossAccountNotAllowed, self).__init__(
"AccessDeniedException", "Cross-account pass role is not allowed."
)
class InvalidParameterValueException(LambdaClientError):
def __init__(self, message):
super(InvalidParameterValueException, self).__init__(
"InvalidParameterValueException", message
)
class InvalidRoleFormat(LambdaClientError):
pattern = r"arn:(aws[a-zA-Z-]*)?:iam::(\d{12}):role/?[a-zA-Z_0-9+=,.@\-_/]+"
def __init__(self, role):
message = "1 validation error detected: Value '{0}' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: {1}".format(
role, InvalidRoleFormat.pattern
)
super(InvalidRoleFormat, self).__init__("ValidationException", message)

View File

@ -33,6 +33,11 @@ from moto.s3.models import s3_backend
from moto.logs.models import logs_backends from moto.logs.models import logs_backends
from moto.s3.exceptions import MissingBucket, MissingKey from moto.s3.exceptions import MissingBucket, MissingKey
from moto import settings from moto import settings
from .exceptions import (
CrossAccountNotAllowed,
InvalidRoleFormat,
InvalidParameterValueException,
)
from .utils import make_function_arn, make_function_ver_arn from .utils import make_function_arn, make_function_ver_arn
from moto.sqs import sqs_backends from moto.sqs import sqs_backends
from moto.dynamodb2 import dynamodb_backends2 from moto.dynamodb2 import dynamodb_backends2
@ -215,9 +220,8 @@ class LambdaFunction(BaseModel):
key = s3_backend.get_key(self.code["S3Bucket"], self.code["S3Key"]) key = s3_backend.get_key(self.code["S3Bucket"], self.code["S3Key"])
except MissingBucket: except MissingBucket:
if do_validate_s3(): if do_validate_s3():
raise ValueError( raise InvalidParameterValueException(
"InvalidParameterValueException", "Error occurred while GetObject. S3 Error Code: NoSuchBucket. S3 Error Message: The specified bucket does not exist"
"Error occurred while GetObject. S3 Error Code: NoSuchBucket. S3 Error Message: The specified bucket does not exist",
) )
except MissingKey: except MissingKey:
if do_validate_s3(): if do_validate_s3():
@ -669,28 +673,19 @@ class LambdaStorage(object):
:param fn: Function :param fn: Function
:type fn: LambdaFunction :type fn: LambdaFunction
""" """
pattern = r"arn:(aws[a-zA-Z-]*)?:iam::(\d{12}):role/?[a-zA-Z_0-9+=,.@\-_/]+" valid_role = re.match(InvalidRoleFormat.pattern, fn.role)
valid_role = re.match(pattern, fn.role)
if valid_role: if valid_role:
account = valid_role.group(2) account = valid_role.group(2)
if account != ACCOUNT_ID: if account != ACCOUNT_ID:
raise ValueError( raise CrossAccountNotAllowed()
"AccessDeniedException", "Cross-account pass role is not allowed."
)
try: try:
iam_backend.get_role_by_arn(fn.role) iam_backend.get_role_by_arn(fn.role)
except IAMNotFoundException: except IAMNotFoundException:
raise ValueError( raise InvalidParameterValueException(
"InvalidParameterValueException", "The role defined for the function cannot be assumed by Lambda."
"The role defined for the function cannot be assumed by Lambda.",
) )
else: else:
raise ValueError( raise InvalidRoleFormat(fn.role)
"ValidationException",
"1 validation error detected: Value '{0}' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: {1}".format(
fn.role, pattern
),
)
if fn.function_name in self._functions: if fn.function_name in self._functions:
self._functions[fn.function_name]["latest"] = fn self._functions[fn.function_name]["latest"] = fn
else: else:

View File

@ -211,26 +211,14 @@ class LambdaResponse(BaseResponse):
return 200, {}, json.dumps(result) return 200, {}, json.dumps(result)
def _create_function(self, request, full_url, headers): def _create_function(self, request, full_url, headers):
try: fn = self.lambda_backend.create_function(self.json_body)
fn = self.lambda_backend.create_function(self.json_body) config = fn.get_configuration()
except ValueError as e: return 201, {}, json.dumps(config)
return 400, {}, json.dumps({"__type": e.args[0], "message": e.args[1]})
else:
config = fn.get_configuration()
return 201, {}, json.dumps(config)
def _create_event_source_mapping(self, request, full_url, headers): def _create_event_source_mapping(self, request, full_url, headers):
try: fn = self.lambda_backend.create_event_source_mapping(self.json_body)
fn = self.lambda_backend.create_event_source_mapping(self.json_body) config = fn.get_configuration()
except ValueError as e: return 201, {}, json.dumps(config)
return (
400,
{},
json.dumps({"Error": {"Code": e.args[0], "Message": e.args[1]}}),
)
else:
config = fn.get_configuration()
return 201, {}, json.dumps(config)
def _list_event_source_mappings(self, event_source_arn, function_name): def _list_event_source_mappings(self, event_source_arn, function_name):
esms = self.lambda_backend.list_event_source_mappings( esms = self.lambda_backend.list_event_source_mappings(

View File

@ -4,6 +4,7 @@ import os
import json import json
import boto import boto
import boto3
import boto.iam import boto.iam
import boto.s3 import boto.s3
import boto.s3.key import boto.s3.key
@ -19,9 +20,9 @@ from moto import (
mock_cloudformation_deprecated, mock_cloudformation_deprecated,
mock_s3_deprecated, mock_s3_deprecated,
mock_route53_deprecated, mock_route53_deprecated,
mock_iam,
) )
from moto.cloudformation import cloudformation_backends from moto.cloudformation import cloudformation_backends
from moto.iam import mock_iam_deprecated
dummy_template = { dummy_template = {
"AWSTemplateFormatVersion": "2010-09-09", "AWSTemplateFormatVersion": "2010-09-09",
@ -596,9 +597,8 @@ def test_create_stack_kinesis():
def get_role_name(): def get_role_name():
with mock_iam_deprecated(): with mock_iam():
iam = boto.connect_iam() iam = boto3.client("iam")
role = iam.create_role("my-role")["create_role_response"]["create_role_result"][ return iam.create_role(
"role" RoleName="TestRole", AssumeRolePolicyDocument="some_policy"
]["arn"] )["Role"]["Arn"]
return role