Add IAM Role and Policy to Config

This commit is contained in:
Nick Stocchero 2020-07-30 08:17:35 -06:00
parent 2e0e542efe
commit 7bc5b5c08f
6 changed files with 793 additions and 5 deletions

View File

@ -23,8 +23,8 @@ However, this will only work on resource types that have this enabled.
### Current enabled resource types: ### Current enabled resource types:
1. S3 1. S3 (all)
1. IAM (Role, Policy)
## Developer Guide ## Developer Guide

View File

@ -47,8 +47,8 @@ from moto.config.exceptions import (
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.s3.config import s3_account_public_access_block_query, s3_config_query from moto.s3.config import s3_account_public_access_block_query, s3_config_query
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from moto.iam.config import role_config_query, policy_config_query
POP_STRINGS = [ POP_STRINGS = [
"capitalizeStart", "capitalizeStart",
@ -64,6 +64,8 @@ DEFAULT_PAGE_SIZE = 100
RESOURCE_MAP = { RESOURCE_MAP = {
"AWS::S3::Bucket": s3_config_query, "AWS::S3::Bucket": s3_config_query,
"AWS::S3::AccountPublicAccessBlock": s3_account_public_access_block_query, "AWS::S3::AccountPublicAccessBlock": s3_account_public_access_block_query,
"AWS::IAM::Role": role_config_query,
"AWS::IAM::Policy": policy_config_query,
} }

View File

@ -766,6 +766,27 @@ class ConfigQueryModel(object):
""" """
raise NotImplementedError() raise NotImplementedError()
def aggregate_regions(self, path, backend_region, resource_region):
"""
Returns a list of "region\1eresourcename" strings
"""
filter_region = backend_region or resource_region
if filter_region:
filter_resources = list(self.backends[filter_region].__dict__[path].keys())
return map(
lambda resource: "{}\1e{}".format(filter_region, resource),
filter_resources,
)
# If we don't have a filter region
ret = []
for region in self.backends:
this_region_resources = list(self.backends[region].__dict__[path].keys())
for resource in this_region_resources:
ret.append("{}\1e{}".format(region, resource))
return ret
class base_decorator(object): class base_decorator(object):
mock_backend = MockAWS mock_backend = MockAWS

173
moto/iam/config.py Normal file
View File

@ -0,0 +1,173 @@
import json
from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel
from moto.iam import iam_backends
class RoleConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
):
# For aggregation -- did we get both a resource ID and a resource name?
if resource_ids and resource_name:
# If the values are different, then return an empty list:
if resource_name not in resource_ids:
return [], None
role_list = self.aggregate_regions("roles", backend_region, resource_region)
if not role_list:
return [], None
# Pagination logic:
sorted_roles = sorted(role_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# "Tokens" are region + \00 + resource ID.
if next_token not in sorted_roles:
raise InvalidNextTokenException()
start = sorted_roles.index(next_token)
# Get the list of items to collect:
role_list = sorted_roles[start : (start + limit)]
if len(sorted_roles) > (start + limit):
new_token = sorted_roles[start + limit]
return (
[
{
"type": "AWS::IAM::Role",
"id": role.split("\1e")[1],
"name": role.split("\1e")[1],
"region": role.split("\1e")[0],
}
for role in role_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
role = self.backends["global"].roles.get(resource_id, {})
if not role:
return
if resource_name and role.name != resource_name:
return
# Format the bucket to the AWS Config format:
config_data = role.to_config_dict()
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
# Supplementary config need all values converted to JSON strings if they are not strings already:
for field, value in config_data["supplementaryConfiguration"].items():
if not isinstance(value, str):
config_data["supplementaryConfiguration"][field] = json.dumps(value)
return config_data
class PolicyConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
):
# For aggregation -- did we get both a resource ID and a resource name?
if resource_ids and resource_name:
# If the values are different, then return an empty list:
if resource_name not in resource_ids:
return [], None
# We don't want to include AWS Managed Policies
policy_list = filter(
lambda policy: not policy.split("\1e")[1].startswith("arn:aws:iam::aws"),
self.aggregate_regions("managed_policies", backend_region, resource_region),
)
if not policy_list:
return [], None
# Pagination logic:
sorted_policies = sorted(policy_list)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
# "Tokens" are region + \00 + resource ID.
if next_token not in sorted_policies:
raise InvalidNextTokenException()
start = sorted_policies.index(next_token)
# Get the list of items to collect:
policy_list = sorted_policies[start : (start + limit)]
if len(sorted_policies) > (start + limit):
new_token = sorted_policies[start + limit]
return (
[
{
"type": "AWS::IAM::Policy",
"id": policy.split("\1e")[1],
"name": policy.split("\1e")[1],
"region": policy.split("\1e")[0],
}
for policy in policy_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
policy = self.backends["global"].managed_policies.get(resource_id, {})
if not policy:
return
if resource_name and policy.name != resource_name:
return
# Format the bucket to the AWS Config format:
config_data = policy.to_config_dict()
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
# Supplementary config need all values converted to JSON strings if they are not strings already:
for field, value in config_data["supplementaryConfiguration"].items():
if not isinstance(value, str):
config_data["supplementaryConfiguration"][field] = json.dumps(value)
return config_data
role_config_query = RoleConfigQuery(iam_backends)
policy_config_query = PolicyConfigQuery(iam_backends)

View File

@ -8,11 +8,13 @@ import sys
from datetime import datetime from datetime import datetime
import json import json
import re import re
import time
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from six.moves.urllib.parse import urlparse from six.moves.urllib.parse import urlparse
from six.moves.urllib import parse
from moto.core.exceptions import RESTError from moto.core.exceptions import RESTError
from moto.core import BaseBackend, BaseModel, ACCOUNT_ID, CloudFormationModel from moto.core import BaseBackend, BaseModel, ACCOUNT_ID, CloudFormationModel
from moto.core.utils import ( from moto.core.utils import (
@ -153,7 +155,7 @@ class OpenIDConnectProvider(BaseModel):
self._errors = [] self._errors = []
self._validate(url, thumbprint_list, client_id_list) self._validate(url, thumbprint_list, client_id_list)
parsed_url = urlparse(url) parsed_url = parse.urlparse(url)
self.url = parsed_url.netloc + parsed_url.path self.url = parsed_url.netloc + parsed_url.path
self.thumbprint_list = thumbprint_list self.thumbprint_list = thumbprint_list
self.client_id_list = client_id_list self.client_id_list = client_id_list
@ -201,7 +203,7 @@ class OpenIDConnectProvider(BaseModel):
self._raise_errors() self._raise_errors()
parsed_url = urlparse(url) parsed_url = parse.urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc: if not parsed_url.scheme or not parsed_url.netloc:
raise ValidationError("Invalid Open ID Connect Provider URL") raise ValidationError("Invalid Open ID Connect Provider URL")
@ -265,6 +267,48 @@ class ManagedPolicy(Policy):
def arn(self): def arn(self):
return "arn:aws:iam::{0}:policy{1}{2}".format(ACCOUNT_ID, self.path, self.name) return "arn:aws:iam::{0}:policy{1}{2}".format(ACCOUNT_ID, self.path, self.name)
def to_config_dict(self):
return {
"version": "1.3",
"configurationItemCaptureTime": str(self.create_date),
"configurationItemStatus": "OK",
"configurationStateId": str(
int(time.mktime(self.create_date.timetuple()))
), # PY2 and 3 compatible
"arn": "arn:aws:iam::{}:policy/{}".format(ACCOUNT_ID, self.name),
"resourceType": "AWS::IAM::Policy",
"resourceId": self.id,
"resourceName": self.name,
"awsRegion": "global",
"availabilityZone": "Not Applicable",
"resourceCreationTime": str(self.create_date),
"configuration": {
"policyName": self.name,
"policyId": self.id,
"arn": "arn:aws:iam::{}:policy/{}".format(ACCOUNT_ID, self.name),
"path": self.path,
"defaultVersionId": self.default_version_id,
"attachmentCount": self.attachment_count,
"permissionsBoundaryUsageCount": 0,
"isAttachable": ManagedPolicy.is_attachable,
"description": self.description,
"createDate": str(self.create_date.isoformat()),
"updateDate": str(self.create_date.isoformat()),
"policyVersionList": list(
map(
lambda version: {
"document": parse.quote(version.document),
"versionId": version.version_id,
"isDefaultVersion": version.is_default,
"createDate": str(version.create_date),
},
self.versions,
)
),
},
"supplementaryConfiguration": {},
}
class AWSManagedPolicy(ManagedPolicy): class AWSManagedPolicy(ManagedPolicy):
"""AWS-managed policy.""" """AWS-managed policy."""
@ -513,6 +557,69 @@ class Role(CloudFormationModel):
def arn(self): def arn(self):
return "arn:aws:iam::{0}:role{1}{2}".format(ACCOUNT_ID, self.path, self.name) return "arn:aws:iam::{0}:role{1}{2}".format(ACCOUNT_ID, self.path, self.name)
def to_config_dict(self):
_managed_policies = []
for key in self.managed_policies.keys():
_managed_policies.append(
{"policyArn": key, "policyName": iam_backend.managed_policies[key].name}
)
_role_policy_list = []
for key, value in self.policies.items():
_role_policy_list.append(
{"policyName": key, "policyDocument": parse.quote(value)}
)
_instance_profiles = []
for key, instance_profile in iam_backend.instance_profiles.items():
for role in instance_profile.roles:
_instance_profiles.append(instance_profile.to_embedded_config_dict())
break
config_dict = {
"version": "1.3",
"configurationItemCaptureTime": str(self.create_date),
"configurationItemStatus": "ResourceDiscovered",
"configurationStateId": str(
int(time.mktime(self.create_date.timetuple()))
), # PY2 and 3 compatible
"arn": "arn:aws:iam::{}:role/{}".format(ACCOUNT_ID, self.name),
"resourceType": "AWS::IAM::Role",
"resourceId": self.name,
"resourceName": self.name,
"awsRegion": "global",
"availabilityZone": "Not Applicable",
"resourceCreationTime": str(self.create_date),
"relatedEvents": [],
"relationships": [],
"tags": self.tags,
"configuration": {
"path": self.path,
"roleName": self.name,
"roleId": self.id,
"arn": "arn:aws:iam::{}:role/{}".format(ACCOUNT_ID, self.name),
"assumeRolePolicyDocument": parse.quote(
self.assume_role_policy_document
)
if self.assume_role_policy_document
else None,
"instanceProfileList": _instance_profiles,
"rolePolicyList": _role_policy_list,
"createDate": self.create_date.isoformat(),
"attachedManagedPolicies": _managed_policies,
"permissionsBoundary": self.permissions_boundary,
"tags": list(
map(
lambda key: {"key": key, "value": self.tags[key]["Value"]},
self.tags,
)
),
"roleLastUsed": None,
},
"supplementaryConfiguration": {},
}
return config_dict
def put_policy(self, policy_name, policy_json): def put_policy(self, policy_name, policy_json):
self.policies[policy_name] = policy_json self.policies[policy_name] = policy_json
@ -590,6 +697,43 @@ class InstanceProfile(CloudFormationModel):
return self.arn return self.arn
raise UnformattedGetAttTemplateException() raise UnformattedGetAttTemplateException()
def to_embedded_config_dict(self):
# Instance Profiles aren't a config item itself, but they are returned in IAM roles with
# a "config like" json structure It's also different than Role.to_config_dict()
roles = []
for role in self.roles:
roles.append(
{
"path": role.path,
"roleName": role.name,
"roleId": role.id,
"arn": "arn:aws:iam::{}:role/{}".format(ACCOUNT_ID, role.name),
"createDate": str(role.create_date),
"assumeRolePolicyDocument": parse.quote(
role.assume_role_policy_document
),
"description": role.description,
"maxSessionDuration": None,
"permissionsBoundary": role.permissions_boundary,
"tags": list(
map(
lambda key: {"key": key, "value": role.tags[key]["Value"]},
role.tags,
)
),
"roleLastUsed": None,
}
)
return {
"path": self.path,
"instanceProfileName": self.name,
"instanceProfileId": self.id,
"arn": "arn:aws:iam::{}:instance-profile/{}".format(ACCOUNT_ID, self.name),
"createDate": str(self.create_date),
"roles": roles,
}
class Certificate(BaseModel): class Certificate(BaseModel):
def __init__(self, cert_name, cert_body, private_key, cert_chain=None, path=None): def __init__(self, cert_name, cert_body, private_key, cert_chain=None, path=None):

View File

@ -19,6 +19,7 @@ from nose.tools import raises
from datetime import datetime from datetime import datetime
from tests.helpers import requires_boto_gte from tests.helpers import requires_boto_gte
from uuid import uuid4 from uuid import uuid4
from six.moves.urllib import parse
MOCK_CERT = """-----BEGIN CERTIFICATE----- MOCK_CERT = """-----BEGIN CERTIFICATE-----
@ -2882,3 +2883,450 @@ def test_delete_role_with_instance_profiles_present():
role_names = [role["RoleName"] for role in iam.list_roles()["Roles"]] role_names = [role["RoleName"] for role in iam.list_roles()["Roles"]]
assert "Role1" in role_names assert "Role1" in role_names
assert "Role2" not in role_names assert "Role2" not in role_names
@mock_iam
def test_delete_account_password_policy_errors():
client = boto3.client("iam", region_name="us-east-1")
client.delete_account_password_policy.when.called_with().should.throw(
ClientError, "The account policy with name PasswordPolicy cannot be found."
)
@mock_iam
def test_role_list_config_discovered_resources():
from moto.iam.config import role_config_query
from moto.iam.utils import random_resource_id
# Without any roles
assert role_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
# Create a role
role_config_query.backends["global"].create_role(
role_name="something",
assume_role_policy_document=None,
path="/",
permissions_boundary=None,
description="something",
tags=[],
max_session_duration=3600,
)
result = role_config_query.list_config_service_resources(None, None, 100, None)[0]
assert len(result) == 1
# The role gets a random ID, so we have to grab it
role = result[0]
assert role["type"] == "AWS::IAM::Role"
assert len(role["id"]) == len(random_resource_id())
assert role["id"] == role["name"]
assert role["region"] == "global"
@mock_iam
def test_policy_list_config_discovered_resources():
from moto.iam.config import policy_config_query
# Without any policies
assert policy_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
basic_policy = {
"Version": "2012-10-17",
"Statement": [
{"Action": ["ec2:DeleteKeyPair"], "Effect": "Deny", "Resource": "*"}
],
}
# Create a role
policy_config_query.backends["global"].create_policy(
description="mypolicy",
path="",
policy_document=json.dumps(basic_policy),
policy_name="mypolicy",
)
result = policy_config_query.list_config_service_resources(None, None, 100, None)[0]
assert len(result) == 1
policy = result[0]
assert policy["type"] == "AWS::IAM::Policy"
assert policy["id"] == policy["name"] == "arn:aws:iam::123456789012:policy/mypolicy"
assert policy["region"] == "global"
@mock_iam
def test_role_config_dict():
from moto.iam.config import role_config_query, policy_config_query
from moto.iam.utils import random_resource_id
# Without any roles
assert not role_config_query.get_config_resource("something")
assert role_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
basic_assume_role = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "sts:AssumeRole"}
],
}
basic_policy = {
"Version": "2012-10-17",
"Statement": [{"Action": ["ec2:*"], "Effect": "Allow", "Resource": "*"}],
}
# Create a policy for use in role permissions boundary
policy_config_query.backends["global"].create_policy(
description="basic_policy",
path="/",
policy_document=json.dumps(basic_policy),
policy_name="basic_policy",
)
policy_arn = policy_config_query.list_config_service_resources(
None, None, 100, None
)[0][0]["id"]
assert policy_arn is not None
# Create some roles (and grab them repeatedly since they create with random names)
role_config_query.backends["global"].create_role(
role_name="plain_role",
assume_role_policy_document=None,
path="/",
permissions_boundary=None,
description="plain_role",
tags=[{"Key": "foo", "Value": "bar"}],
max_session_duration=3600,
)
plain_role = role_config_query.list_config_service_resources(None, None, 100, None)[
0
][0]
assert plain_role is not None
assert len(plain_role["id"]) == len(random_resource_id())
role_config_query.backends["global"].create_role(
role_name="assume_role",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=None,
description="assume_role",
tags=[],
max_session_duration=3600,
)
assume_role = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"] not in [plain_role["id"]]
)
assert assume_role is not None
assert len(assume_role["id"]) == len(random_resource_id())
assert assume_role["id"] is not plain_role["id"]
role_config_query.backends["global"].create_role(
role_name="assume_and_permission_boundary_role",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=policy_arn,
description="assume_and_permission_boundary_role",
tags=[],
max_session_duration=3600,
)
assume_and_permission_boundary_role = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"] not in [plain_role["id"], assume_role["id"]]
)
assert assume_and_permission_boundary_role is not None
assert len(assume_and_permission_boundary_role["id"]) == len(random_resource_id())
assert assume_and_permission_boundary_role["id"] is not plain_role["id"]
assert assume_and_permission_boundary_role["id"] is not assume_role["id"]
role_config_query.backends["global"].create_role(
role_name="role_with_attached_policy",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=None,
description="role_with_attached_policy",
tags=[],
max_session_duration=3600,
)
role_config_query.backends["global"].attach_role_policy(
policy_arn, "role_with_attached_policy"
)
role_with_attached_policy = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"]
not in [
plain_role["id"],
assume_role["id"],
assume_and_permission_boundary_role["id"],
]
)
assert role_with_attached_policy is not None
assert len(role_with_attached_policy["id"]) == len(random_resource_id())
assert role_with_attached_policy["id"] is not plain_role["id"]
assert role_with_attached_policy["id"] is not assume_role["id"]
assert (
role_with_attached_policy["id"] is not assume_and_permission_boundary_role["id"]
)
role_config_query.backends["global"].create_role(
role_name="role_with_inline_policy",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=None,
description="role_with_inline_policy",
tags=[],
max_session_duration=3600,
)
role_config_query.backends["global"].put_role_policy(
"role_with_inline_policy", "inline_policy", json.dumps(basic_policy)
)
role_with_inline_policy = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"]
not in [
plain_role["id"],
assume_role["id"],
assume_and_permission_boundary_role["id"],
role_with_attached_policy["id"],
]
)
assert role_with_inline_policy is not None
assert len(role_with_inline_policy["id"]) == len(random_resource_id())
assert role_with_inline_policy["id"] is not plain_role["id"]
assert role_with_inline_policy["id"] is not assume_role["id"]
assert (
role_with_inline_policy["id"] is not assume_and_permission_boundary_role["id"]
)
assert role_with_inline_policy["id"] is not role_with_attached_policy["id"]
# plain role
plain_role_config = (
role_config_query.backends["global"].roles[plain_role["id"]].to_config_dict()
)
assert plain_role_config["version"] == "1.3"
assert plain_role_config["configurationItemStatus"] == "ResourceDiscovered"
assert plain_role_config["configurationStateId"] is not None
assert plain_role_config["arn"] == "arn:aws:iam::123456789012:role/plain_role"
assert plain_role_config["resourceType"] == "AWS::IAM::Role"
assert plain_role_config["resourceId"] == "plain_role"
assert plain_role_config["resourceName"] == "plain_role"
assert plain_role_config["awsRegion"] == "global"
assert plain_role_config["availabilityZone"] == "Not Applicable"
assert plain_role_config["resourceCreationTime"] is not None
assert plain_role_config["tags"] == {"foo": {"Key": "foo", "Value": "bar"}}
assert plain_role_config["configuration"]["path"] == "/"
assert plain_role_config["configuration"]["roleName"] == "plain_role"
assert plain_role_config["configuration"]["roleId"] == plain_role["id"]
assert plain_role_config["configuration"]["arn"] == plain_role_config["arn"]
assert plain_role_config["configuration"]["assumeRolePolicyDocument"] is None
assert plain_role_config["configuration"]["instanceProfileList"] == []
assert plain_role_config["configuration"]["rolePolicyList"] == []
assert plain_role_config["configuration"]["attachedManagedPolicies"] == []
assert plain_role_config["configuration"]["permissionsBoundary"] is None
assert plain_role_config["configuration"]["tags"] == [
{"key": "foo", "value": "bar"}
]
assert plain_role_config["supplementaryConfiguration"] == {}
# assume_role
assume_role_config = (
role_config_query.backends["global"].roles[assume_role["id"]].to_config_dict()
)
assert assume_role_config["arn"] == "arn:aws:iam::123456789012:role/assume_role"
assert assume_role_config["resourceId"] == "assume_role"
assert assume_role_config["resourceName"] == "assume_role"
assert assume_role_config["configuration"][
"assumeRolePolicyDocument"
] == parse.quote(json.dumps(basic_assume_role))
# assume_and_permission_boundary_role
assume_and_permission_boundary_role_config = (
role_config_query.backends["global"]
.roles[assume_and_permission_boundary_role["id"]]
.to_config_dict()
)
assert (
assume_and_permission_boundary_role_config["arn"]
== "arn:aws:iam::123456789012:role/assume_and_permission_boundary_role"
)
assert (
assume_and_permission_boundary_role_config["resourceId"]
== "assume_and_permission_boundary_role"
)
assert (
assume_and_permission_boundary_role_config["resourceName"]
== "assume_and_permission_boundary_role"
)
assert assume_and_permission_boundary_role_config["configuration"][
"assumeRolePolicyDocument"
] == parse.quote(json.dumps(basic_assume_role))
assert (
assume_and_permission_boundary_role_config["configuration"][
"permissionsBoundary"
]
== policy_arn
)
# role_with_attached_policy
role_with_attached_policy_config = (
role_config_query.backends["global"]
.roles[role_with_attached_policy["id"]]
.to_config_dict()
)
assert (
role_with_attached_policy_config["arn"]
== "arn:aws:iam::123456789012:role/role_with_attached_policy"
)
assert role_with_attached_policy_config["configuration"][
"attachedManagedPolicies"
] == [{"policyArn": policy_arn, "policyName": "basic_policy"}]
# role_with_inline_policy
role_with_inline_policy_config = (
role_config_query.backends["global"]
.roles[role_with_inline_policy["id"]]
.to_config_dict()
)
assert (
role_with_inline_policy_config["arn"]
== "arn:aws:iam::123456789012:role/role_with_inline_policy"
)
assert role_with_inline_policy_config["configuration"]["rolePolicyList"] == [
{
"policyName": "inline_policy",
"policyDocument": parse.quote(json.dumps(basic_policy)),
}
]
@mock_iam
def test_policy_config_dict():
from moto.iam.config import role_config_query, policy_config_query
from moto.iam.utils import random_policy_id
# Without any roles
assert not policy_config_query.get_config_resource(
"arn:aws:iam::123456789012:policy/basic_policy"
)
assert policy_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
basic_policy = {
"Version": "2012-10-17",
"Statement": [{"Action": ["ec2:*"], "Effect": "Allow", "Resource": "*"}],
}
basic_policy_v2 = {
"Version": "2012-10-17",
"Statement": [
{"Action": ["ec2:*", "s3:*"], "Effect": "Allow", "Resource": "*"}
],
}
policy_config_query.backends["global"].create_policy(
description="basic_policy",
path="/",
policy_document=json.dumps(basic_policy),
policy_name="basic_policy",
)
policy_arn = policy_config_query.list_config_service_resources(
None, None, 100, None
)[0][0]["id"]
assert policy_arn == "arn:aws:iam::123456789012:policy/basic_policy"
assert (
policy_config_query.get_config_resource(
"arn:aws:iam::123456789012:policy/basic_policy"
)
is not None
)
# Create a new version
policy_config_query.backends["global"].create_policy_version(
policy_arn, json.dumps(basic_policy_v2), "true"
)
# Create role to trigger attachment
role_config_query.backends["global"].create_role(
role_name="role_with_attached_policy",
assume_role_policy_document=None,
path="/",
permissions_boundary=None,
description="role_with_attached_policy",
tags=[],
max_session_duration=3600,
)
role_config_query.backends["global"].attach_role_policy(
policy_arn, "role_with_attached_policy"
)
policy = (
role_config_query.backends["global"]
.managed_policies["arn:aws:iam::123456789012:policy/basic_policy"]
.to_config_dict()
)
assert policy["version"] == "1.3"
assert policy["configurationItemCaptureTime"] is not None
assert policy["configurationItemStatus"] == "OK"
assert policy["configurationStateId"] is not None
assert policy["arn"] == "arn:aws:iam::123456789012:policy/basic_policy"
assert policy["resourceType"] == "AWS::IAM::Policy"
assert len(policy["resourceId"]) == len(random_policy_id())
assert policy["resourceName"] == "basic_policy"
assert policy["awsRegion"] == "global"
assert policy["availabilityZone"] == "Not Applicable"
assert policy["resourceCreationTime"] is not None
assert policy["configuration"]["policyName"] == policy["resourceName"]
assert policy["configuration"]["policyId"] == policy["resourceId"]
assert policy["configuration"]["arn"] == policy["arn"]
assert policy["configuration"]["path"] == "/"
assert policy["configuration"]["defaultVersionId"] == "v2"
assert policy["configuration"]["attachmentCount"] == 1
assert policy["configuration"]["permissionsBoundaryUsageCount"] == 0
assert policy["configuration"]["isAttachable"] == True
assert policy["configuration"]["description"] == "basic_policy"
assert policy["configuration"]["createDate"] is not None
assert policy["configuration"]["updateDate"] is not None
assert policy["configuration"]["policyVersionList"] == [
{
"document": str(parse.quote(json.dumps(basic_policy))),
"versionId": "v1",
"isDefaultVersion": False,
"createDate": policy["configuration"]["policyVersionList"][0]["createDate"],
},
{
"document": str(parse.quote(json.dumps(basic_policy_v2))),
"versionId": "v2",
"isDefaultVersion": True,
"createDate": policy["configuration"]["policyVersionList"][1]["createDate"],
},
]
assert policy["supplementaryConfiguration"] == {}