AWSLambda: enforce concurrency quota in put_function_concurrency()
(#6628)
This commit is contained in:
parent
05f34b9521
commit
824f22a794
@ -2034,6 +2034,31 @@ class LambdaBackend(BaseBackend):
|
|||||||
def put_function_concurrency(
|
def put_function_concurrency(
|
||||||
self, function_name: str, reserved_concurrency: str
|
self, function_name: str, reserved_concurrency: str
|
||||||
) -> str:
|
) -> str:
|
||||||
|
"""Establish concurrency limit/reservations for a function
|
||||||
|
|
||||||
|
Actual lambda restricts concurrency to 1000 (default) per region/account
|
||||||
|
across all functions; we approximate that behavior by summing across all
|
||||||
|
functions (hopefully all in the same account and region) and allowing the
|
||||||
|
caller to simulate an increased quota.
|
||||||
|
|
||||||
|
By default, no quota is enforced in order to preserve compatibility with
|
||||||
|
existing code that assumes it can do as many things as it likes. To model
|
||||||
|
actual AWS behavior, define the MOTO_LAMBDA_CONCURRENCY_QUOTA environment
|
||||||
|
variable prior to testing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
quota: Optional[str] = os.environ.get("MOTO_LAMBDA_CONCURRENCY_QUOTA")
|
||||||
|
if quota is not None:
|
||||||
|
# Enforce concurrency limits as described above
|
||||||
|
available = int(quota) - int(reserved_concurrency)
|
||||||
|
for fnx in self.list_functions():
|
||||||
|
if fnx.reserved_concurrency and fnx.function_name != function_name:
|
||||||
|
available -= int(fnx.reserved_concurrency)
|
||||||
|
if available < 100:
|
||||||
|
raise InvalidParameterValueException(
|
||||||
|
"Specified ReservedConcurrentExecutions for function decreases account's UnreservedConcurrentExecution below its minimum value of [100]."
|
||||||
|
)
|
||||||
|
|
||||||
fn = self.get_function(function_name)
|
fn = self.get_function(function_name)
|
||||||
fn.reserved_concurrency = reserved_concurrency
|
fn.reserved_concurrency = reserved_concurrency
|
||||||
return fn.reserved_concurrency
|
return fn.reserved_concurrency
|
||||||
|
@ -1636,3 +1636,151 @@ def test_get_role_name_utility_race_condition():
|
|||||||
assert len(errors) + len(roles) == num_threads
|
assert len(errors) + len(roles) == num_threads
|
||||||
assert roles.count(roles[0]) == len(roles)
|
assert roles.count(roles[0]) == len(roles)
|
||||||
assert len(errors) == 0
|
assert len(errors) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@mock_lambda
|
||||||
|
@mock.patch.dict(os.environ, {"MOTO_LAMBDA_CONCURRENCY_QUOTA": "1000"})
|
||||||
|
def test_put_function_concurrency_success():
|
||||||
|
if settings.TEST_SERVER_MODE:
|
||||||
|
raise SkipTest(
|
||||||
|
"Envars not easily set in server mode, feature off by default, skipping..."
|
||||||
|
)
|
||||||
|
conn = boto3.client("lambda", _lambda_region)
|
||||||
|
zip_content = get_test_zip_file1()
|
||||||
|
function_name = str(uuid4())[0:6]
|
||||||
|
conn.create_function(
|
||||||
|
FunctionName=function_name,
|
||||||
|
Runtime="python2.7",
|
||||||
|
Role=get_role_name(),
|
||||||
|
Handler="lambda_function.lambda_handler",
|
||||||
|
Code={"ZipFile": zip_content},
|
||||||
|
Description="test lambda function",
|
||||||
|
Timeout=3,
|
||||||
|
MemorySize=128,
|
||||||
|
Publish=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
response = conn.put_function_concurrency(
|
||||||
|
FunctionName=function_name, ReservedConcurrentExecutions=900
|
||||||
|
)
|
||||||
|
assert response["ReservedConcurrentExecutions"] == 900
|
||||||
|
|
||||||
|
|
||||||
|
@mock_lambda
|
||||||
|
@mock.patch.dict(os.environ, {"MOTO_LAMBDA_CONCURRENCY_QUOTA": "don't care"})
|
||||||
|
def test_put_function_concurrency_not_enforced():
|
||||||
|
del os.environ["MOTO_LAMBDA_CONCURRENCY_QUOTA"] # i.e. not set by user
|
||||||
|
conn = boto3.client("lambda", _lambda_region)
|
||||||
|
zip_content = get_test_zip_file1()
|
||||||
|
function_name = str(uuid4())[0:6]
|
||||||
|
conn.create_function(
|
||||||
|
FunctionName=function_name,
|
||||||
|
Runtime="python2.7",
|
||||||
|
Role=get_role_name(),
|
||||||
|
Handler="lambda_function.lambda_handler",
|
||||||
|
Code={"ZipFile": zip_content},
|
||||||
|
Description="test lambda function",
|
||||||
|
Timeout=3,
|
||||||
|
MemorySize=128,
|
||||||
|
Publish=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# This works, even though it normally would be disallowed by AWS
|
||||||
|
response = conn.put_function_concurrency(
|
||||||
|
FunctionName=function_name, ReservedConcurrentExecutions=901
|
||||||
|
)
|
||||||
|
assert response["ReservedConcurrentExecutions"] == 901
|
||||||
|
|
||||||
|
|
||||||
|
@mock_lambda
|
||||||
|
@mock.patch.dict(os.environ, {"MOTO_LAMBDA_CONCURRENCY_QUOTA": "1000"})
|
||||||
|
def test_put_function_concurrency_failure():
|
||||||
|
if settings.TEST_SERVER_MODE:
|
||||||
|
raise SkipTest(
|
||||||
|
"Envars not easily set in server mode, feature off by default, skipping..."
|
||||||
|
)
|
||||||
|
conn = boto3.client("lambda", _lambda_region)
|
||||||
|
zip_content = get_test_zip_file1()
|
||||||
|
function_name = str(uuid4())[0:6]
|
||||||
|
conn.create_function(
|
||||||
|
FunctionName=function_name,
|
||||||
|
Runtime="python2.7",
|
||||||
|
Role=get_role_name(),
|
||||||
|
Handler="lambda_function.lambda_handler",
|
||||||
|
Code={"ZipFile": zip_content},
|
||||||
|
Description="test lambda function",
|
||||||
|
Timeout=3,
|
||||||
|
MemorySize=128,
|
||||||
|
Publish=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
conn.put_function_concurrency(
|
||||||
|
FunctionName=function_name, ReservedConcurrentExecutions=901
|
||||||
|
)
|
||||||
|
|
||||||
|
assert exc.value.response["Error"]["Code"] == "InvalidParameterValueException"
|
||||||
|
|
||||||
|
# No reservation should have been set
|
||||||
|
response = conn.get_function_concurrency(FunctionName=function_name)
|
||||||
|
assert "ReservedConcurrentExecutions" not in response
|
||||||
|
|
||||||
|
|
||||||
|
@mock_lambda
|
||||||
|
@mock.patch.dict(os.environ, {"MOTO_LAMBDA_CONCURRENCY_QUOTA": "1000"})
|
||||||
|
def test_put_function_concurrency_i_can_has_math():
|
||||||
|
if settings.TEST_SERVER_MODE:
|
||||||
|
raise SkipTest(
|
||||||
|
"Envars not easily set in server mode, feature off by default, skipping..."
|
||||||
|
)
|
||||||
|
conn = boto3.client("lambda", _lambda_region)
|
||||||
|
zip_content = get_test_zip_file1()
|
||||||
|
function_name_1 = str(uuid4())[0:6]
|
||||||
|
function_name_2 = str(uuid4())[0:6]
|
||||||
|
conn.create_function(
|
||||||
|
FunctionName=function_name_1,
|
||||||
|
Runtime="python2.7",
|
||||||
|
Role=get_role_name(),
|
||||||
|
Handler="lambda_function.lambda_handler",
|
||||||
|
Code={"ZipFile": zip_content},
|
||||||
|
Description="test lambda function",
|
||||||
|
Timeout=3,
|
||||||
|
MemorySize=128,
|
||||||
|
Publish=True,
|
||||||
|
)
|
||||||
|
conn.create_function(
|
||||||
|
FunctionName=function_name_2,
|
||||||
|
Runtime="python2.7",
|
||||||
|
Role=get_role_name(),
|
||||||
|
Handler="lambda_function.lambda_handler",
|
||||||
|
Code={"ZipFile": zip_content},
|
||||||
|
Description="test lambda function",
|
||||||
|
Timeout=3,
|
||||||
|
MemorySize=128,
|
||||||
|
Publish=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
response = conn.put_function_concurrency(
|
||||||
|
FunctionName=function_name_1, ReservedConcurrentExecutions=600
|
||||||
|
)
|
||||||
|
assert response["ReservedConcurrentExecutions"] == 600
|
||||||
|
response = conn.put_function_concurrency(
|
||||||
|
FunctionName=function_name_2, ReservedConcurrentExecutions=100
|
||||||
|
)
|
||||||
|
assert response["ReservedConcurrentExecutions"] == 100
|
||||||
|
|
||||||
|
# Increasing function 1's limit should succeed, e.g. 700 + 100 <= 900
|
||||||
|
response = conn.put_function_concurrency(
|
||||||
|
FunctionName=function_name_1, ReservedConcurrentExecutions=700
|
||||||
|
)
|
||||||
|
assert response["ReservedConcurrentExecutions"] == 700
|
||||||
|
|
||||||
|
# Increasing function 2's limit should fail, e.g. 700 + 201 > 900
|
||||||
|
with pytest.raises(ClientError) as exc:
|
||||||
|
conn.put_function_concurrency(
|
||||||
|
FunctionName=function_name_2, ReservedConcurrentExecutions=201
|
||||||
|
)
|
||||||
|
|
||||||
|
assert exc.value.response["Error"]["Code"] == "InvalidParameterValueException"
|
||||||
|
response = conn.get_function_concurrency(FunctionName=function_name_2)
|
||||||
|
assert response["ReservedConcurrentExecutions"] == 100
|
||||||
|
Loading…
x
Reference in New Issue
Block a user