AWSLambda - Allow ARN as parameter (#3740)
This commit is contained in:
parent
a1e415ec03
commit
cc50acc8b5
@ -970,7 +970,9 @@ class LambdaStorage(object):
|
||||
fn.policy = Policy(fn)
|
||||
self._arns[fn.function_arn] = fn
|
||||
|
||||
def publish_function(self, name):
|
||||
def publish_function(self, name_or_arn):
|
||||
function = self.get_function_by_name_or_arn(name_or_arn)
|
||||
name = function.function_name
|
||||
if name not in self._functions:
|
||||
return None
|
||||
if not self._functions[name]["latest"]:
|
||||
|
@ -170,7 +170,7 @@ class LambdaResponse(BaseResponse):
|
||||
|
||||
def _add_policy(self, request, full_url, headers):
|
||||
path = request.path if hasattr(request, "path") else path_url(request.url)
|
||||
function_name = path.split("/")[-2]
|
||||
function_name = unquote(path.split("/")[-2])
|
||||
if self.lambda_backend.get_function(function_name):
|
||||
statement = self.body
|
||||
self.lambda_backend.add_permission(function_name, statement)
|
||||
@ -180,7 +180,7 @@ class LambdaResponse(BaseResponse):
|
||||
|
||||
def _get_policy(self, request, full_url, headers):
|
||||
path = request.path if hasattr(request, "path") else path_url(request.url)
|
||||
function_name = path.split("/")[-2]
|
||||
function_name = unquote(path.split("/")[-2])
|
||||
if self.lambda_backend.get_function(function_name):
|
||||
out = self.lambda_backend.get_policy_wire_format(function_name)
|
||||
return 200, {}, out
|
||||
@ -189,7 +189,7 @@ class LambdaResponse(BaseResponse):
|
||||
|
||||
def _del_policy(self, request, full_url, headers, querystring):
|
||||
path = request.path if hasattr(request, "path") else path_url(request.url)
|
||||
function_name = path.split("/")[-3]
|
||||
function_name = unquote(path.split("/")[-3])
|
||||
statement_id = path.split("/")[-1].split("?")[0]
|
||||
revision = querystring.get("RevisionId", "")
|
||||
if self.lambda_backend.get_function(function_name):
|
||||
@ -225,7 +225,7 @@ class LambdaResponse(BaseResponse):
|
||||
def _invoke_async(self, request, full_url):
|
||||
response_headers = {}
|
||||
|
||||
function_name = self.path.rsplit("/", 3)[-3]
|
||||
function_name = unquote(self.path.rsplit("/", 3)[-3])
|
||||
|
||||
fn = self.lambda_backend.get_function(function_name, None)
|
||||
if fn:
|
||||
@ -386,7 +386,7 @@ class LambdaResponse(BaseResponse):
|
||||
return 404, {}, "{}"
|
||||
|
||||
def _put_configuration(self, request):
|
||||
function_name = self.path.rsplit("/", 2)[-2]
|
||||
function_name = unquote(self.path.rsplit("/", 2)[-2])
|
||||
qualifier = self._get_param("Qualifier", None)
|
||||
resp = self.lambda_backend.update_function_configuration(
|
||||
function_name, qualifier, body=self.json_body
|
||||
@ -398,7 +398,7 @@ class LambdaResponse(BaseResponse):
|
||||
return 404, {}, "{}"
|
||||
|
||||
def _put_code(self):
|
||||
function_name = self.path.rsplit("/", 2)[-2]
|
||||
function_name = unquote(self.path.rsplit("/", 2)[-2])
|
||||
qualifier = self._get_param("Qualifier", None)
|
||||
resp = self.lambda_backend.update_function_code(
|
||||
function_name, qualifier, body=self.json_body
|
||||
@ -410,7 +410,7 @@ class LambdaResponse(BaseResponse):
|
||||
return 404, {}, "{}"
|
||||
|
||||
def _get_function_concurrency(self, request):
|
||||
path_function_name = self.path.rsplit("/", 2)[-2]
|
||||
path_function_name = unquote(self.path.rsplit("/", 2)[-2])
|
||||
function_name = self.lambda_backend.get_function(path_function_name)
|
||||
|
||||
if function_name is None:
|
||||
@ -420,7 +420,7 @@ class LambdaResponse(BaseResponse):
|
||||
return 200, {}, json.dumps({"ReservedConcurrentExecutions": resp})
|
||||
|
||||
def _delete_function_concurrency(self, request):
|
||||
path_function_name = self.path.rsplit("/", 2)[-2]
|
||||
path_function_name = unquote(self.path.rsplit("/", 2)[-2])
|
||||
function_name = self.lambda_backend.get_function(path_function_name)
|
||||
|
||||
if function_name is None:
|
||||
@ -431,7 +431,7 @@ class LambdaResponse(BaseResponse):
|
||||
return 204, {}, "{}"
|
||||
|
||||
def _put_function_concurrency(self, request):
|
||||
path_function_name = self.path.rsplit("/", 2)[-2]
|
||||
path_function_name = unquote(self.path.rsplit("/", 2)[-2])
|
||||
function = self.lambda_backend.get_function(path_function_name)
|
||||
|
||||
if function is None:
|
||||
|
@ -13,13 +13,13 @@ url_paths = {
|
||||
r"{0}/(?P<api_version>[^/]+)/event-source-mappings/(?P<UUID>[\w_-]+)/?$": response.event_source_mapping,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invocations/?$": response.invoke,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<resource_arn>.+)/invocations/?$": response.invoke,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invoke-async/?$": response.invoke_async,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/invoke-async/?$": response.invoke_async,
|
||||
r"{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)": response.tag,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/policy/(?P<statement_id>[\w_-]+)$": response.policy,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/policy/?$": response.policy,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/configuration/?$": response.configuration,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/code/?$": response.code,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/concurrency/?$": response.function_concurrency,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/policy/(?P<statement_id>[\w_-]+)$": response.policy,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/policy/?$": response.policy,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/configuration/?$": response.configuration,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/code/?$": response.code,
|
||||
r"{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_:%-]+)/concurrency/?$": response.function_concurrency,
|
||||
r"{0}/(?P<api_version>[^/]+)/layers/?$": response.list_layers,
|
||||
r"{0}/(?P<api_version>[^/]+)/layers/(?P<layer_name>[\w_-]+)/versions/?$": response.layers_versions,
|
||||
}
|
||||
|
@ -127,10 +127,11 @@ def test_list_functions():
|
||||
|
||||
@pytest.mark.network
|
||||
@pytest.mark.parametrize("invocation_type", [None, "RequestResponse"])
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_invoke_requestresponse_function(invocation_type):
|
||||
def test_invoke_requestresponse_function(invocation_type, key):
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
conn.create_function(
|
||||
fxn = conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=get_role_name(),
|
||||
@ -141,6 +142,7 @@ def test_invoke_requestresponse_function(invocation_type):
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = fxn[key]
|
||||
|
||||
# Only add invocation-type keyword-argument when provided, otherwise the request
|
||||
# fails to be validated
|
||||
@ -150,7 +152,7 @@ def test_invoke_requestresponse_function(invocation_type):
|
||||
|
||||
in_data = {"msg": "So long and thanks for all the fish"}
|
||||
success_result = conn.invoke(
|
||||
FunctionName="testFunction", Payload=json.dumps(in_data), LogType="Tail", **kw
|
||||
FunctionName=name_or_arn, Payload=json.dumps(in_data), LogType="Tail", **kw
|
||||
)
|
||||
|
||||
if "FunctionError" in success_result:
|
||||
@ -171,7 +173,7 @@ def test_invoke_requestresponse_function(invocation_type):
|
||||
|
||||
# Logs should not be returned by default, only when the LogType-param is supplied
|
||||
success_result = conn.invoke(
|
||||
FunctionName="testFunction", Payload=json.dumps(in_data), **kw
|
||||
FunctionName=name_or_arn, Payload=json.dumps(in_data), **kw
|
||||
)
|
||||
|
||||
success_result["StatusCode"].should.equal(200)
|
||||
@ -181,39 +183,6 @@ def test_invoke_requestresponse_function(invocation_type):
|
||||
assert "LogResult" not in success_result
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
@mock_lambda
|
||||
def test_invoke_requestresponse_function_with_arn():
|
||||
from moto.awslambda.models import ACCOUNT_ID
|
||||
|
||||
conn = boto3.client("lambda", "us-west-2")
|
||||
conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=get_role_name(),
|
||||
Handler="lambda_function.lambda_handler",
|
||||
Code={"ZipFile": get_test_zip_file1()},
|
||||
Description="test lambda function",
|
||||
Timeout=3,
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
|
||||
in_data = {"msg": "So long and thanks for all the fish"}
|
||||
success_result = conn.invoke(
|
||||
FunctionName="arn:aws:lambda:us-west-2:{}:function:testFunction".format(
|
||||
ACCOUNT_ID
|
||||
),
|
||||
InvocationType="RequestResponse",
|
||||
Payload=json.dumps(in_data),
|
||||
)
|
||||
|
||||
success_result["StatusCode"].should.equal(200)
|
||||
|
||||
payload = success_result["Payload"].read().decode("utf-8")
|
||||
json.loads(payload).should.equal(in_data)
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
@mock_lambda
|
||||
def test_invoke_event_function():
|
||||
@ -576,10 +545,11 @@ def test_get_function():
|
||||
conn.get_function(FunctionName="junk", Qualifier="$LATEST")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
@freeze_time("2015-01-01 00:00:00")
|
||||
def test_get_function_configuration():
|
||||
def test_get_function_configuration(key):
|
||||
s3_conn = boto3.client("s3", _lambda_region)
|
||||
s3_conn.create_bucket(
|
||||
Bucket="test-bucket",
|
||||
@ -590,7 +560,7 @@ def test_get_function_configuration():
|
||||
s3_conn.put_object(Bucket="test-bucket", Key="test.zip", Body=zip_content)
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
|
||||
conn.create_function(
|
||||
fxn = conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=get_role_name(),
|
||||
@ -602,8 +572,9 @@ def test_get_function_configuration():
|
||||
Publish=True,
|
||||
Environment={"Variables": {"test_variable": "test_value"}},
|
||||
)
|
||||
name_or_arn = fxn[key]
|
||||
|
||||
result = conn.get_function_configuration(FunctionName="testFunction")
|
||||
result = conn.get_function_configuration(FunctionName=name_or_arn)
|
||||
|
||||
result["CodeSha256"].should.equal(hashlib.sha256(zip_content).hexdigest())
|
||||
result["CodeSize"].should.equal(len(zip_content))
|
||||
@ -623,7 +594,7 @@ def test_get_function_configuration():
|
||||
|
||||
# Test get function with qualifier
|
||||
result = conn.get_function_configuration(
|
||||
FunctionName="testFunction", Qualifier="$LATEST"
|
||||
FunctionName=name_or_arn, Qualifier="$LATEST"
|
||||
)
|
||||
result["Version"].should.equal("$LATEST")
|
||||
result["FunctionArn"].should.equal(
|
||||
@ -983,10 +954,11 @@ def test_tags_not_found():
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_invoke_async_function():
|
||||
def test_invoke_async_function(key):
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
conn.create_function(
|
||||
fxn = conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=get_role_name(),
|
||||
@ -997,9 +969,10 @@ def test_invoke_async_function():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = fxn[key]
|
||||
|
||||
success_result = conn.invoke_async(
|
||||
FunctionName="testFunction", InvokeArgs=json.dumps({"test": "event"})
|
||||
FunctionName=name_or_arn, InvokeArgs=json.dumps({"test": "event"})
|
||||
)
|
||||
|
||||
success_result["Status"].should.equal(202)
|
||||
@ -1053,11 +1026,15 @@ def test_get_function_created_with_zipfile():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_add_function_permission():
|
||||
def test_add_function_permission(key):
|
||||
"""
|
||||
Parametrized to ensure that we can add permission by using the FunctionName and the FunctionArn
|
||||
"""
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
zip_content = get_test_zip_file1()
|
||||
conn.create_function(
|
||||
f = conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=(get_role_name()),
|
||||
@ -1068,9 +1045,10 @@ def test_add_function_permission():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = f[key]
|
||||
|
||||
response = conn.add_permission(
|
||||
FunctionName="testFunction",
|
||||
FunctionName=name_or_arn,
|
||||
StatementId="1",
|
||||
Action="lambda:InvokeFunction",
|
||||
Principal="432143214321",
|
||||
@ -1084,11 +1062,12 @@ def test_add_function_permission():
|
||||
assert res["Action"] == "lambda:InvokeFunction"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_get_function_policy():
|
||||
def test_get_function_policy(key):
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
zip_content = get_test_zip_file1()
|
||||
conn.create_function(
|
||||
f = conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=get_role_name(),
|
||||
@ -1099,9 +1078,10 @@ def test_get_function_policy():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = f[key]
|
||||
|
||||
response = conn.add_permission(
|
||||
FunctionName="testFunction",
|
||||
conn.add_permission(
|
||||
FunctionName=name_or_arn,
|
||||
StatementId="1",
|
||||
Action="lambda:InvokeFunction",
|
||||
Principal="432143214321",
|
||||
@ -1111,7 +1091,7 @@ def test_get_function_policy():
|
||||
Qualifier="2",
|
||||
)
|
||||
|
||||
response = conn.get_policy(FunctionName="testFunction")
|
||||
response = conn.get_policy(FunctionName=name_or_arn)
|
||||
|
||||
assert "Policy" in response
|
||||
res = json.loads(response["Policy"])
|
||||
@ -1256,10 +1236,11 @@ def test_create_event_source_mapping():
|
||||
|
||||
|
||||
@pytest.mark.network
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_logs
|
||||
@mock_lambda
|
||||
@mock_sqs
|
||||
def test_invoke_function_from_sqs():
|
||||
def test_invoke_function_from_sqs(key):
|
||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||
queue = sqs.create_queue(QueueName="test-sqs-queue1")
|
||||
|
||||
@ -1275,9 +1256,10 @@ def test_invoke_function_from_sqs():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = func[key]
|
||||
|
||||
response = conn.create_event_source_mapping(
|
||||
EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"]
|
||||
EventSourceArn=queue.attributes["QueueArn"], FunctionName=name_or_arn
|
||||
)
|
||||
|
||||
assert response["EventSourceArn"] == queue.attributes["QueueArn"]
|
||||
@ -1631,9 +1613,10 @@ def test_delete_event_source_mapping():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
@mock_s3
|
||||
def test_update_configuration():
|
||||
def test_update_configuration(key):
|
||||
s3_conn = boto3.client("s3", _lambda_region)
|
||||
s3_conn.create_bucket(
|
||||
Bucket="test-bucket",
|
||||
@ -1656,6 +1639,7 @@ def test_update_configuration():
|
||||
Publish=True,
|
||||
Environment={"Variables": {"test_old_environment": "test_old_value"}},
|
||||
)
|
||||
name_or_arn = fxn[key]
|
||||
|
||||
assert fxn["Description"] == "test lambda function"
|
||||
assert fxn["Handler"] == "lambda_function.lambda_handler"
|
||||
@ -1664,7 +1648,7 @@ def test_update_configuration():
|
||||
assert fxn["Timeout"] == 3
|
||||
|
||||
updated_config = conn.update_function_configuration(
|
||||
FunctionName="testFunction",
|
||||
FunctionName=name_or_arn,
|
||||
Description="updated test lambda function",
|
||||
Handler="lambda_function.new_lambda_handler",
|
||||
Runtime="python3.6",
|
||||
@ -1689,8 +1673,9 @@ def test_update_configuration():
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_update_function_zip():
|
||||
def test_update_function_zip(key):
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
|
||||
zip_content_one = get_test_zip_file1()
|
||||
@ -1706,11 +1691,12 @@ def test_update_function_zip():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = fxn[key]
|
||||
|
||||
zip_content_two = get_test_zip_file2()
|
||||
|
||||
fxn_updated = conn.update_function_code(
|
||||
FunctionName="testFunctionZip", ZipFile=zip_content_two, Publish=True
|
||||
conn.update_function_code(
|
||||
FunctionName=name_or_arn, ZipFile=zip_content_two, Publish=True
|
||||
)
|
||||
|
||||
response = conn.get_function(FunctionName="testFunctionZip", Qualifier="2")
|
||||
@ -1837,11 +1823,12 @@ def test_create_function_with_unknown_arn():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_remove_function_permission():
|
||||
def test_remove_function_permission(key):
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
zip_content = get_test_zip_file1()
|
||||
conn.create_function(
|
||||
f = conn.create_function(
|
||||
FunctionName="testFunction",
|
||||
Runtime="python2.7",
|
||||
Role=(get_role_name()),
|
||||
@ -1852,9 +1839,10 @@ def test_remove_function_permission():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = f[key]
|
||||
|
||||
conn.add_permission(
|
||||
FunctionName="testFunction",
|
||||
FunctionName=name_or_arn,
|
||||
StatementId="1",
|
||||
Action="lambda:InvokeFunction",
|
||||
Principal="432143214321",
|
||||
@ -1865,21 +1853,22 @@ def test_remove_function_permission():
|
||||
)
|
||||
|
||||
remove = conn.remove_permission(
|
||||
FunctionName="testFunction", StatementId="1", Qualifier="2",
|
||||
FunctionName=name_or_arn, StatementId="1", Qualifier="2",
|
||||
)
|
||||
remove["ResponseMetadata"]["HTTPStatusCode"].should.equal(204)
|
||||
policy = conn.get_policy(FunctionName="testFunction", Qualifier="2")["Policy"]
|
||||
policy = conn.get_policy(FunctionName=name_or_arn, Qualifier="2")["Policy"]
|
||||
policy = json.loads(policy)
|
||||
policy["Statement"].should.equal([])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_put_function_concurrency():
|
||||
def test_put_function_concurrency(key):
|
||||
expected_concurrency = 15
|
||||
function_name = "test"
|
||||
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
conn.create_function(
|
||||
f = conn.create_function(
|
||||
FunctionName=function_name,
|
||||
Runtime="python3.8",
|
||||
Role=(get_role_name()),
|
||||
@ -1890,19 +1879,21 @@ def test_put_function_concurrency():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = f[key]
|
||||
result = conn.put_function_concurrency(
|
||||
FunctionName=function_name, ReservedConcurrentExecutions=expected_concurrency
|
||||
FunctionName=name_or_arn, ReservedConcurrentExecutions=expected_concurrency
|
||||
)
|
||||
|
||||
result["ReservedConcurrentExecutions"].should.equal(expected_concurrency)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_delete_function_concurrency():
|
||||
def test_delete_function_concurrency(key):
|
||||
function_name = "test"
|
||||
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
conn.create_function(
|
||||
f = conn.create_function(
|
||||
FunctionName=function_name,
|
||||
Runtime="python3.8",
|
||||
Role=(get_role_name()),
|
||||
@ -1913,23 +1904,25 @@ def test_delete_function_concurrency():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = f[key]
|
||||
conn.put_function_concurrency(
|
||||
FunctionName=function_name, ReservedConcurrentExecutions=15
|
||||
FunctionName=name_or_arn, ReservedConcurrentExecutions=15
|
||||
)
|
||||
|
||||
conn.delete_function_concurrency(FunctionName=function_name)
|
||||
conn.delete_function_concurrency(FunctionName=name_or_arn)
|
||||
result = conn.get_function(FunctionName=function_name)
|
||||
|
||||
result.doesnt.have.key("Concurrency")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])
|
||||
@mock_lambda
|
||||
def test_get_function_concurrency():
|
||||
def test_get_function_concurrency(key):
|
||||
expected_concurrency = 15
|
||||
function_name = "test"
|
||||
|
||||
conn = boto3.client("lambda", _lambda_region)
|
||||
conn.create_function(
|
||||
f = conn.create_function(
|
||||
FunctionName=function_name,
|
||||
Runtime="python3.8",
|
||||
Role=(get_role_name()),
|
||||
@ -1940,11 +1933,12 @@ def test_get_function_concurrency():
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)
|
||||
name_or_arn = f[key]
|
||||
conn.put_function_concurrency(
|
||||
FunctionName=function_name, ReservedConcurrentExecutions=expected_concurrency
|
||||
FunctionName=name_or_arn, ReservedConcurrentExecutions=expected_concurrency
|
||||
)
|
||||
|
||||
result = conn.get_function_concurrency(FunctionName=function_name)
|
||||
result = conn.get_function_concurrency(FunctionName=name_or_arn)
|
||||
|
||||
result["ReservedConcurrentExecutions"].should.equal(expected_concurrency)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user