Implement IAM back end part of update_assume_role_policy to validate policies (#5340)

This commit is contained in:
Michael Merrill 2022-07-29 23:25:56 -04:00 committed by GitHub
parent d28c4bfb93
commit 23c4f47635
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 164 additions and 14 deletions

View File

@ -3290,7 +3290,7 @@
- [X] untag_user
- [X] update_access_key
- [X] update_account_password_policy
- [ ] update_assume_role_policy
- [X] update_assume_role_policy
- [X] update_group
- [X] update_login_profile
- [X] update_open_id_connect_provider_thumbprint

View File

@ -192,7 +192,7 @@ iam
- [X] untag_user
- [X] update_access_key
- [X] update_account_password_policy
- [ ] update_assume_role_policy
- [X] update_assume_role_policy
- [X] update_group
- [X] update_login_profile
- [X] update_open_id_connect_provider_thumbprint

View File

@ -21,7 +21,10 @@ from moto.core.utils import (
iso_8601_datetime_with_milliseconds,
BackendDict,
)
from moto.iam.policy_validation import IAMPolicyDocumentValidator
from moto.iam.policy_validation import (
IAMPolicyDocumentValidator,
IAMTrustPolicyDocumentValidator,
)
from moto.utilities.utils import md5_hash
from .aws_managed_policies import aws_managed_policies_data
@ -1851,6 +1854,12 @@ class IAMBackend(BaseBackend):
def get_roles(self):
return self.roles.values()
def update_assume_role_policy(self, role_name, policy_document):
role = self.get_role(role_name)
iam_policy_document_validator = IAMTrustPolicyDocumentValidator(policy_document)
iam_policy_document_validator.validate()
role.assume_role_policy_document = policy_document
def put_role_policy(self, role_name, policy_name, policy_json):
role = self.get_role(role_name)

View File

@ -83,7 +83,7 @@ VALID_RESOURCE_PATH_STARTING_VALUES = {
}
class IAMPolicyDocumentValidator:
class BaseIAMPolicyValidator:
def __init__(self, policy_document):
self._policy_document = policy_document
self._policy_json = {}
@ -117,10 +117,6 @@ class IAMPolicyDocumentValidator:
self._validate_action_like_exist()
except Exception:
raise MalformedPolicyDocument("Policy statement must contain actions.")
try:
self._validate_resource_exist()
except Exception:
raise MalformedPolicyDocument("Policy statement must contain resources.")
if self._resource_error != "":
raise MalformedPolicyDocument(self._resource_error)
@ -521,3 +517,55 @@ class IAMPolicyDocumentValidator:
if seconds_with_decimal_fraction_partition[1] == ".":
decimal_seconds = seconds_with_decimal_fraction_partition[2]
assert 0 <= int(decimal_seconds) <= 999999999
class IAMPolicyDocumentValidator(BaseIAMPolicyValidator):
def __init__(self, policy_document):
super().__init__(policy_document)
def validate(self):
super().validate()
try:
self._validate_resource_exist()
except Exception:
raise MalformedPolicyDocument("Policy statement must contain resources.")
class IAMTrustPolicyDocumentValidator(BaseIAMPolicyValidator):
def __init__(self, policy_document):
super().__init__(policy_document)
def validate(self):
super().validate()
try:
for statement in self._statements:
if isinstance(statement["Action"], str):
IAMTrustPolicyDocumentValidator._validate_trust_policy_action(
statement["Action"]
)
else:
for action in statement["Action"]:
IAMTrustPolicyDocumentValidator._validate_trust_policy_action(
action
)
except Exception:
raise MalformedPolicyDocument(
"Trust Policy statement actions can only be sts:AssumeRole, "
"sts:AssumeRoleWithSAML, and sts:AssumeRoleWithWebIdentity"
)
try:
self._validate_resource_not_exist()
except Exception:
raise MalformedPolicyDocument("Has prohibited field Resource.")
def _validate_resource_not_exist(self):
for statement in self._statements:
assert "Resource" not in statement and "NotResource" not in statement
@staticmethod
def _validate_trust_policy_action(action):
assert action in (
"sts:AssumeRole",
"sts:AssumeRoleWithSAML",
"sts:AssumeRoleWithWebIdentity",
)

View File

@ -255,8 +255,8 @@ class IamResponse(BaseResponse):
def update_assume_role_policy(self):
role_name = self._get_param("RoleName")
role = self.backend.get_role(role_name)
role.assume_role_policy_document = self._get_param("PolicyDocument")
policy_document = self._get_param("PolicyDocument")
self.backend.update_assume_role_policy(role_name, policy_document)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="UpdateAssumeRolePolicy")

View File

@ -379,14 +379,107 @@ def test_get_role_policy():
@mock_iam
def test_update_assume_role_policy():
def test_update_assume_role_invalid_policy():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="my-path"
)
conn.update_assume_role_policy(RoleName="my-role", PolicyDocument="new policy")
with pytest.raises(ClientError) as ex:
conn.update_assume_role_policy(RoleName="my-role", PolicyDocument="new policy")
err = ex.value.response["Error"]
err["Code"].should.equal("MalformedPolicyDocument")
err["Message"].should.contain("Syntax errors in policy.")
@mock_iam
def test_update_assume_role_valid_policy():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="my-path"
)
policy_document = """
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": ["ec2.amazonaws.com"]
},
"Action": ["sts:AssumeRole"]
}
]
}
"""
conn.update_assume_role_policy(RoleName="my-role", PolicyDocument=policy_document)
role = conn.get_role(RoleName="my-role")["Role"]
role["AssumeRolePolicyDocument"].should.equal("new policy")
role["AssumeRolePolicyDocument"]["Statement"][0]["Action"][0].should.equal(
"sts:AssumeRole"
)
@mock_iam
def test_update_assume_role_invalid_policy_bad_action():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="my-path"
)
policy_document = """
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": ["ec2.amazonaws.com"]
},
"Action": ["sts:BadAssumeRole"]
}
]
}
"""
with pytest.raises(ClientError) as ex:
conn.update_assume_role_policy(
RoleName="my-role", PolicyDocument=policy_document
)
err = ex.value.response["Error"]
err["Code"].should.equal("MalformedPolicyDocument")
err["Message"].should.contain(
"Trust Policy statement actions can only be sts:AssumeRole, "
"sts:AssumeRoleWithSAML, and sts:AssumeRoleWithWebIdentity"
)
@mock_iam
def test_update_assume_role_invalid_policy_with_resource():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="my-path"
)
policy_document = """
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": ["ec2.amazonaws.com"]
},
"Action": ["sts:AssumeRole"],
"Resource" : "arn:aws:s3:::example_bucket"
}
]
}
"""
with pytest.raises(ClientError) as ex:
conn.update_assume_role_policy(
RoleName="my-role", PolicyDocument=policy_document
)
err = ex.value.response["Error"]
err["Code"].should.equal("MalformedPolicyDocument")
err["Message"].should.contain("Has prohibited field Resource.")
@mock_iam

View File

@ -317,7 +317,7 @@ invalid_policy_document_test_cases = [
"Version": "2012-10-17",
"Statement": {"Effect": "Allow", "Action": "invalid"},
},
"error_message": "Policy statement must contain resources.",
"error_message": "Actions/Conditions must be prefaced by a vendor, e.g., iam, sdb, ec2, etc.",
},
{
"document": {