AWSLambda:add_permission() now properly handles the PrincipalOrgID-parameter (#5511)

This commit is contained in:
Bert Blommers 2022-10-01 21:08:20 +00:00 committed by GitHub
parent 3679521d76
commit 2c2db272f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 3 deletions

View File

@ -1730,7 +1730,7 @@ class LambdaBackend(BaseBackend):
def add_permission(self, function_name, qualifier, raw):
fn = self.get_function(function_name, qualifier)
fn.policy.add_statement(raw, qualifier)
return fn.policy.add_statement(raw, qualifier)
def remove_permission(self, function_name, sid, revision=""):
fn = self.get_function(function_name)

View File

@ -46,6 +46,7 @@ class Policy:
)
self.statements.append(policy.statements[0])
self.revision = str(mock_random.uuid4())
return policy.statements[0]
# removes the statement that matches 'sid' from the policy
def del_statement(self, sid, revision=""):
@ -85,6 +86,9 @@ class Policy:
self.transform_property(
obj, "SourceAccount", "SourceAccount", self.source_account_formatter
)
self.transform_property(
obj, "PrincipalOrgID", "Condition", self.principal_org_id_formatter
)
# remove RevisionId and EventSourceToken if they are set
self.remove_if_set(obj, ["RevisionId", "EventSourceToken"])
@ -118,6 +122,9 @@ class Policy:
def source_arn_formatter(self, obj):
return {"ArnLike": {"AWS:SourceArn": obj}}
def principal_org_id_formatter(self, obj):
return {"StringEquals": {"aws:PrincipalOrgID": obj}}
def transform_property(self, obj, old_name, new_name, formatter):
if old_name in obj:
obj[new_name] = formatter(obj[old_name])

View File

@ -207,8 +207,8 @@ class LambdaResponse(BaseResponse):
function_name = unquote(path.split("/")[-2])
qualifier = self.querystring.get("Qualifier", [None])[0]
statement = self.body
self.backend.add_permission(function_name, qualifier, statement)
return 200, {}, json.dumps({"Statement": statement})
statement = self.backend.add_permission(function_name, qualifier, statement)
return 200, {}, json.dumps({"Statement": json.dumps(statement)})
def _get_policy(self, request):
path = request.path if hasattr(request, "path") else path_url(request.url)

View File

@ -41,6 +41,45 @@ def test_add_function_permission(key):
assert "Statement" in response
res = json.loads(response["Statement"])
assert res["Action"] == "lambda:InvokeFunction"
res["Condition"].should.equal(
{
"ArnLike": {
"AWS:SourceArn": "arn:aws:lambda:us-west-2:account-id:function:helloworld"
}
}
)
@mock_lambda
def test_add_permission_with_principalorgid():
conn = boto3.client("lambda", _lambda_region)
zip_content = get_test_zip_file1()
function_name = str(uuid4())[0:6]
fn_arn = conn.create_function(
FunctionName=function_name,
Runtime="python2.7",
Role=(get_role_name()),
Handler="lambda_function.handler",
Code={"ZipFile": zip_content},
)["FunctionArn"]
source_arn = "arn:aws:lambda:us-west-2:account-id:function:helloworld"
response = conn.add_permission(
FunctionName=fn_arn,
StatementId="1",
Action="lambda:InvokeFunction",
Principal="432143214321",
PrincipalOrgID="o-a1b2c3d4e5",
SourceArn=source_arn,
)
assert "Statement" in response
res = json.loads(response["Statement"])
res["Condition"].should.have.key("StringEquals").equals(
{"aws:PrincipalOrgID": "o-a1b2c3d4e5"}
)
res["Condition"].should.have.key("ArnLike").equals({"AWS:SourceArn": source_arn})
res.shouldnt.have.key("PrincipalOrgID")
@pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"])