Merge pull request #3202 from nick123pig/config-for-iam-role-policy

Config for IAM roles and policies
This commit is contained in:
Mike Grima 2020-09-21 19:15:06 -07:00 committed by GitHub
commit 672a4b9a27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1477 additions and 17 deletions

View File

@ -23,8 +23,8 @@ However, this will only work on resource types that have this enabled.
### Current enabled resource types:
1. S3
1. S3 (all)
1. IAM (Role, Policy)
## Developer Guide
@ -53,15 +53,14 @@ An example of the above is implemented for S3. You can see that by looking at:
1. `moto/s3/config.py`
1. `moto/config/models.py`
As well as the corresponding unit tests in:
### Testing
For each resource type, you will need to test write tests for a few separate areas:
1. `tests/s3/test_s3.py`
1. `tests/config/test_config.py`
- Test the backend queries to ensure discovered resources come back (ie for `IAM::Policy`, write `tests.tests_iam.test_policy_list_config_discovered_resources`). For writing these tests, you must not make use of `boto` to create resources. You will need to use the backend model methods to provision the resources. This is to make tests compatible with the moto server. You must make tests for the resource type to test listing and object fetching.
Note for unit testing, you will want to add a test to ensure that you can query all the resources effectively. For testing this feature,
the unit tests for the `ConfigQueryModel` will not make use of `boto` to create resources, such as S3 buckets. You will need to use the
backend model methods to provision the resources. This is to make tests compatible with the moto server. You should absolutely make tests
in the resource type to test listing and object fetching.
- Test the config dict for all scenarios (ie for `IAM::Policy`, write `tests.tests_iam.test_policy_config_dict`). For writing this test, you'll need to create resources in the same way as the first test (without using `boto`), in every meaningful configuration that would produce a different config dict. Then, query the backend and ensure each of the dicts are as you expect.
- Test that everything works end to end with the `boto` clients. (ie for `IAM::Policy`, write `tests.tests_iam.test_policy_config_client`). The main two items to test will be the `boto.client('config').list_discovered_resources()`, `boto.client('config').list_aggregate_discovered_resources()`, `moto.client('config').batch_get_resource_config()`, and `moto.client('config').batch_aggregate_get_resource_config()`. This test doesn't have to be super thorough, but it basically tests that the front end and backend logic all works together and returns correct resources. Beware the aggregate methods all have capital first letters (ie `Limit`), while non-aggregate methods have lowercase first letters (ie `limit`)
### Listing
S3 is currently the model implementation, but it also odd in that S3 is a global resource type with regional resource residency.
@ -117,4 +116,4 @@ return for it.
When implementing resource config fetching, you will need to return at a minimum `None` if the resource is not found, or a `dict` that looks
like what AWS Config would return.
It's recommended to read the comment for the `ConfigQueryModel` 's `get_config_resource` function in [base class here](moto/core/models.py).
It's recommended to read the comment for the `ConfigQueryModel` 's `get_config_resource` function in [base class here](moto/core/models.py).

View File

@ -47,9 +47,10 @@ from moto.config.exceptions import (
from moto.core import BaseBackend, BaseModel
from moto.s3.config import s3_account_public_access_block_query, s3_config_query
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from moto.iam.config import role_config_query, policy_config_query
POP_STRINGS = [
"capitalizeStart",
"CapitalizeStart",
@ -64,6 +65,8 @@ DEFAULT_PAGE_SIZE = 100
RESOURCE_MAP = {
"AWS::S3::Bucket": s3_config_query,
"AWS::S3::AccountPublicAccessBlock": s3_account_public_access_block_query,
"AWS::IAM::Role": role_config_query,
"AWS::IAM::Policy": policy_config_query,
}
@ -977,6 +980,7 @@ class ConfigBackend(BaseBackend):
limit,
next_token,
resource_region=resource_region,
aggregator=self.config_aggregators.get(aggregator_name).__dict__,
)
resource_identifiers = []
@ -987,7 +991,6 @@ class ConfigBackend(BaseBackend):
"ResourceType": identifier["type"],
"ResourceId": identifier["id"],
}
if identifier.get("name"):
item["ResourceName"] = identifier["name"]

View File

@ -27,7 +27,6 @@ from .utils import (
convert_flask_to_responses_response,
)
ACCOUNT_ID = os.environ.get("MOTO_ACCOUNT_ID", "123456789012")
@ -692,6 +691,7 @@ class ConfigQueryModel(object):
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
"""For AWS Config. This will list all of the resources of the given type and optional resource name and region.
@ -723,6 +723,10 @@ class ConfigQueryModel(object):
:param backend_region: The region for the backend to pull results from. Set to `None` if this is an aggregated query.
:param resource_region: The region for where the resources reside to pull results from. Set to `None` if this is a
non-aggregated query.
:param aggregator: If the query is an aggregated query, *AND* the resource has "non-standard" aggregation logic (mainly, IAM),
you'll need to pass aggregator used. In most cases, this should be omitted/set to `None`. See the
conditional logic under `if aggregator` in the moto/iam/config.py for the IAM example.
:return: This should return a list of Dicts that have the following fields:
[
{

321
moto/iam/config.py Normal file
View File

@ -0,0 +1,321 @@
import json
import boto3
from moto.core.exceptions import InvalidNextTokenException
from moto.core.models import ConfigQueryModel
from moto.iam import iam_backends
class RoleConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
# IAM roles are "global" and aren't assigned into any availability zone
# The resource ID is a AWS-assigned random string like "AROA0BSVNSZKXVHS00SBJ"
# The resource name is a user-assigned string like "MyDevelopmentAdminRole"
# Stored in moto backend with the AWS-assigned random string like "AROA0BSVNSZKXVHS00SBJ"
# Grab roles from backend; need the full values since names and id's are different
role_list = list(self.backends["global"].roles.values())
if not role_list:
return [], None
# Filter by resource name or ids
if resource_name or resource_ids:
filtered_roles = []
# resource_name takes precedence over resource_ids
if resource_name:
for role in role_list:
if role.name == resource_name:
filtered_roles = [role]
break
# but if both are passed, it must be a subset
if filtered_roles and resource_ids:
if filtered_roles[0].id not in resource_ids:
return [], None
else:
for role in role_list:
if role.id in resource_ids:
filtered_roles.append(role)
# Filtered roles are now the subject for the listing
role_list = filtered_roles
if aggregator:
# IAM is a little special; Roles are created in us-east-1 (which AWS calls the "global" region)
# However, the resource will return in the aggregator (in duplicate) for each region in the aggregator
# Therefore, we'll need to find out the regions where the aggregators are running, and then duplicate the resource there
# In practice, it looks like AWS will only duplicate these resources if you've "used" any roles in the region, but since
# we can't really tell if this has happened in moto, we'll just bind this to the regions in your aggregator
aggregated_regions = []
aggregator_sources = aggregator.get(
"account_aggregation_sources"
) or aggregator.get("organization_aggregation_source")
for source in aggregator_sources:
source_dict = source.__dict__
if source_dict.get("all_aws_regions", False):
aggregated_regions = boto3.Session().get_available_regions("config")
break
for region in source_dict.get("aws_regions", []):
aggregated_regions.append(region)
duplicate_role_list = []
for region in list(set(aggregated_regions)):
for role in role_list:
duplicate_role_list.append(
{
"_id": "{}{}".format(
role.id, region
), # this is only for sorting, isn't returned outside of this functin
"type": "AWS::IAM::Role",
"id": role.id,
"name": role.name,
"region": region,
}
)
# Pagination logic, sort by role id
sorted_roles = sorted(duplicate_role_list, key=lambda role: role["_id"])
else:
# Non-aggregated queries are in the else block, and we can treat these like a normal config resource
# Pagination logic, sort by role id
sorted_roles = sorted(role_list, key=lambda role: role.id)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
try:
# Find the index of the next
start = next(
index
for (index, r) in enumerate(sorted_roles)
if next_token == (r["_id"] if aggregator else r.id)
)
except StopIteration:
raise InvalidNextTokenException()
# Get the list of items to collect:
role_list = sorted_roles[start : (start + limit)]
if len(sorted_roles) > (start + limit):
record = sorted_roles[start + limit]
new_token = record["_id"] if aggregator else record.id
return (
[
{
"type": "AWS::IAM::Role",
"id": role["id"] if aggregator else role.id,
"name": role["name"] if aggregator else role.name,
"region": role["region"] if aggregator else "global",
}
for role in role_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
role = self.backends["global"].roles.get(resource_id, {})
if not role:
return
if resource_name and role.name != resource_name:
return
# Format the role to the AWS Config format:
config_data = role.to_config_dict()
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
# Supplementary config need all values converted to JSON strings if they are not strings already:
for field, value in config_data["supplementaryConfiguration"].items():
if not isinstance(value, str):
config_data["supplementaryConfiguration"][field] = json.dumps(value)
return config_data
class PolicyConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
# IAM policies are "global" and aren't assigned into any availability zone
# The resource ID is a AWS-assigned random string like "ANPA0BSVNSZK00SJSPVUJ"
# The resource name is a user-assigned string like "my-development-policy"
# Stored in moto backend with the arn like "arn:aws:iam::123456789012:policy/my-development-policy"
policy_list = list(self.backends["global"].managed_policies.values())
# We don't want to include AWS Managed Policies. This technically needs to
# respect the configuration recorder's 'includeGlobalResourceTypes' setting,
# but it's default set be default, and moto's config doesn't yet support
# custom configuration recorders, we'll just behave as default.
policy_list = list(
filter(
lambda policy: not policy.arn.startswith("arn:aws:iam::aws"),
policy_list,
)
)
if not policy_list:
return [], None
# Filter by resource name or ids
if resource_name or resource_ids:
filtered_policies = []
# resource_name takes precedence over resource_ids
if resource_name:
for policy in policy_list:
if policy.name == resource_name:
filtered_policies = [policy]
break
# but if both are passed, it must be a subset
if filtered_policies and resource_ids:
if filtered_policies[0].id not in resource_ids:
return [], None
else:
for policy in policy_list:
if policy.id in resource_ids:
filtered_policies.append(policy)
# Filtered roles are now the subject for the listing
policy_list = filtered_policies
if aggregator:
# IAM is a little special; Policies are created in us-east-1 (which AWS calls the "global" region)
# However, the resource will return in the aggregator (in duplicate) for each region in the aggregator
# Therefore, we'll need to find out the regions where the aggregators are running, and then duplicate the resource there
# In practice, it looks like AWS will only duplicate these resources if you've "used" any policies in the region, but since
# we can't really tell if this has happened in moto, we'll just bind this to the regions in your aggregator
aggregated_regions = []
aggregator_sources = aggregator.get(
"account_aggregation_sources"
) or aggregator.get("organization_aggregation_source")
for source in aggregator_sources:
source_dict = source.__dict__
if source_dict.get("all_aws_regions", False):
aggregated_regions = boto3.Session().get_available_regions("config")
break
for region in source_dict.get("aws_regions", []):
aggregated_regions.append(region)
duplicate_policy_list = []
for region in list(set(aggregated_regions)):
for policy in policy_list:
duplicate_policy_list.append(
{
"_id": "{}{}".format(
policy.id, region
), # this is only for sorting, isn't returned outside of this functin
"type": "AWS::IAM::Policy",
"id": policy.id,
"name": policy.name,
"region": region,
}
)
# Pagination logic, sort by role id
sorted_policies = sorted(
duplicate_policy_list, key=lambda policy: policy["_id"]
)
else:
# Non-aggregated queries are in the else block, and we can treat these like a normal config resource
# Pagination logic, sort by role id
sorted_policies = sorted(policy_list, key=lambda role: role.id)
new_token = None
# Get the start:
if not next_token:
start = 0
else:
try:
# Find the index of the next
start = next(
index
for (index, p) in enumerate(sorted_policies)
if next_token == (p["_id"] if aggregator else p.id)
)
except StopIteration:
raise InvalidNextTokenException()
# Get the list of items to collect:
policy_list = sorted_policies[start : (start + limit)]
if len(sorted_policies) > (start + limit):
record = sorted_policies[start + limit]
new_token = record["_id"] if aggregator else record.id
return (
[
{
"type": "AWS::IAM::Policy",
"id": policy["id"] if aggregator else policy.id,
"name": policy["name"] if aggregator else policy.name,
"region": policy["region"] if aggregator else "global",
}
for policy in policy_list
],
new_token,
)
def get_config_resource(
self, resource_id, resource_name=None, backend_region=None, resource_region=None
):
# policies are listed in the backend as arns, but we have to accept the PolicyID as the resource_id
# we'll make a really crude search for it
policy = None
for arn in self.backends["global"].managed_policies.keys():
policy_candidate = self.backends["global"].managed_policies[arn]
if policy_candidate.id == resource_id:
policy = policy_candidate
break
if not policy:
return
if resource_name and policy.name != resource_name:
return
# Format the policy to the AWS Config format:
config_data = policy.to_config_dict()
# The 'configuration' field is also a JSON string:
config_data["configuration"] = json.dumps(config_data["configuration"])
# Supplementary config need all values converted to JSON strings if they are not strings already:
for field, value in config_data["supplementaryConfiguration"].items():
if not isinstance(value, str):
config_data["supplementaryConfiguration"][field] = json.dumps(value)
return config_data
role_config_query = RoleConfigQuery(iam_backends)
policy_config_query = PolicyConfigQuery(iam_backends)

View File

@ -8,11 +8,12 @@ import sys
from datetime import datetime
import json
import re
import time
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from six.moves.urllib.parse import urlparse
from six.moves.urllib import parse
from moto.core.exceptions import RESTError
from moto.core import BaseBackend, BaseModel, ACCOUNT_ID, CloudFormationModel
from moto.core.utils import (
@ -153,7 +154,7 @@ class OpenIDConnectProvider(BaseModel):
self._errors = []
self._validate(url, thumbprint_list, client_id_list)
parsed_url = urlparse(url)
parsed_url = parse.urlparse(url)
self.url = parsed_url.netloc + parsed_url.path
self.thumbprint_list = thumbprint_list
self.client_id_list = client_id_list
@ -201,7 +202,7 @@ class OpenIDConnectProvider(BaseModel):
self._raise_errors()
parsed_url = urlparse(url)
parsed_url = parse.urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc:
raise ValidationError("Invalid Open ID Connect Provider URL")
@ -265,6 +266,48 @@ class ManagedPolicy(Policy):
def arn(self):
return "arn:aws:iam::{0}:policy{1}{2}".format(ACCOUNT_ID, self.path, self.name)
def to_config_dict(self):
return {
"version": "1.3",
"configurationItemCaptureTime": str(self.create_date),
"configurationItemStatus": "OK",
"configurationStateId": str(
int(time.mktime(self.create_date.timetuple()))
), # PY2 and 3 compatible
"arn": "arn:aws:iam::{}:policy/{}".format(ACCOUNT_ID, self.name),
"resourceType": "AWS::IAM::Policy",
"resourceId": self.id,
"resourceName": self.name,
"awsRegion": "global",
"availabilityZone": "Not Applicable",
"resourceCreationTime": str(self.create_date),
"configuration": {
"policyName": self.name,
"policyId": self.id,
"arn": "arn:aws:iam::{}:policy/{}".format(ACCOUNT_ID, self.name),
"path": self.path,
"defaultVersionId": self.default_version_id,
"attachmentCount": self.attachment_count,
"permissionsBoundaryUsageCount": 0,
"isAttachable": ManagedPolicy.is_attachable,
"description": self.description,
"createDate": str(self.create_date.isoformat()),
"updateDate": str(self.create_date.isoformat()),
"policyVersionList": list(
map(
lambda version: {
"document": parse.quote(version.document),
"versionId": version.version_id,
"isDefaultVersion": version.is_default,
"createDate": str(version.create_date),
},
self.versions,
)
),
},
"supplementaryConfiguration": {},
}
class AWSManagedPolicy(ManagedPolicy):
"""AWS-managed policy."""
@ -513,6 +556,69 @@ class Role(CloudFormationModel):
def arn(self):
return "arn:aws:iam::{0}:role{1}{2}".format(ACCOUNT_ID, self.path, self.name)
def to_config_dict(self):
_managed_policies = []
for key in self.managed_policies.keys():
_managed_policies.append(
{"policyArn": key, "policyName": iam_backend.managed_policies[key].name}
)
_role_policy_list = []
for key, value in self.policies.items():
_role_policy_list.append(
{"policyName": key, "policyDocument": parse.quote(value)}
)
_instance_profiles = []
for key, instance_profile in iam_backend.instance_profiles.items():
for role in instance_profile.roles:
_instance_profiles.append(instance_profile.to_embedded_config_dict())
break
config_dict = {
"version": "1.3",
"configurationItemCaptureTime": str(self.create_date),
"configurationItemStatus": "ResourceDiscovered",
"configurationStateId": str(
int(time.mktime(self.create_date.timetuple()))
), # PY2 and 3 compatible
"arn": "arn:aws:iam::{}:role/{}".format(ACCOUNT_ID, self.name),
"resourceType": "AWS::IAM::Role",
"resourceId": self.name,
"resourceName": self.name,
"awsRegion": "global",
"availabilityZone": "Not Applicable",
"resourceCreationTime": str(self.create_date),
"relatedEvents": [],
"relationships": [],
"tags": self.tags,
"configuration": {
"path": self.path,
"roleName": self.name,
"roleId": self.id,
"arn": "arn:aws:iam::{}:role/{}".format(ACCOUNT_ID, self.name),
"assumeRolePolicyDocument": parse.quote(
self.assume_role_policy_document
)
if self.assume_role_policy_document
else None,
"instanceProfileList": _instance_profiles,
"rolePolicyList": _role_policy_list,
"createDate": self.create_date.isoformat(),
"attachedManagedPolicies": _managed_policies,
"permissionsBoundary": self.permissions_boundary,
"tags": list(
map(
lambda key: {"key": key, "value": self.tags[key]["Value"]},
self.tags,
)
),
"roleLastUsed": None,
},
"supplementaryConfiguration": {},
}
return config_dict
def put_policy(self, policy_name, policy_json):
self.policies[policy_name] = policy_json
@ -590,6 +696,43 @@ class InstanceProfile(CloudFormationModel):
return self.arn
raise UnformattedGetAttTemplateException()
def to_embedded_config_dict(self):
# Instance Profiles aren't a config item itself, but they are returned in IAM roles with
# a "config like" json structure It's also different than Role.to_config_dict()
roles = []
for role in self.roles:
roles.append(
{
"path": role.path,
"roleName": role.name,
"roleId": role.id,
"arn": "arn:aws:iam::{}:role/{}".format(ACCOUNT_ID, role.name),
"createDate": str(role.create_date),
"assumeRolePolicyDocument": parse.quote(
role.assume_role_policy_document
),
"description": role.description,
"maxSessionDuration": None,
"permissionsBoundary": role.permissions_boundary,
"tags": list(
map(
lambda key: {"key": key, "value": role.tags[key]["Value"]},
role.tags,
)
),
"roleLastUsed": None,
}
)
return {
"path": self.path,
"instanceProfileName": self.name,
"instanceProfileId": self.id,
"arn": "arn:aws:iam::{}:instance-profile/{}".format(ACCOUNT_ID, self.name),
"createDate": str(self.create_date),
"roles": roles,
}
class Certificate(BaseModel):
def __init__(self, cert_name, cert_body, private_key, cert_chain=None, path=None):

View File

@ -19,6 +19,7 @@ class S3ConfigQuery(ConfigQueryModel):
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
# The resource_region only matters for aggregated queries as you can filter on bucket regions for them.
# For other resource types, you would need to iterate appropriately for the backend_region.
@ -132,6 +133,7 @@ class S3AccountPublicAccessBlockConfigQuery(ConfigQueryModel):
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
# For the Account Public Access Block, they are the same for all regions. The resource ID is the AWS account ID
# There is no resource name -- it should be a blank string "" if provided.

File diff suppressed because it is too large Load Diff