IAM: Only load AWS Managed Policies on request (#7219)

This commit is contained in:
Bert Blommers 2024-01-17 10:01:57 +00:00
parent 0c8ccbb406
commit f834d98314
7 changed files with 65 additions and 28 deletions

View File

@ -16,9 +16,18 @@ class _core_config(TypedDict, total=False):
passthrough: _passthrough_config passthrough: _passthrough_config
class _iam_config(TypedDict, total=False):
load_aws_managed_policies: bool
DefaultConfig = TypedDict( DefaultConfig = TypedDict(
"DefaultConfig", "DefaultConfig",
{"batch": _docker_config, "core": _core_config, "lambda": _docker_config}, {
"batch": _docker_config,
"core": _core_config,
"lambda": _docker_config,
"iam": _iam_config,
},
total=False, total=False,
) )
@ -26,6 +35,7 @@ default_user_config: DefaultConfig = {
"batch": {"use_docker": True}, "batch": {"use_docker": True},
"lambda": {"use_docker": True}, "lambda": {"use_docker": True},
"core": {"mock_credentials": True, "passthrough": {"urls": [], "services": []}}, "core": {"mock_credentials": True, "passthrough": {"urls": [], "services": []}},
"iam": {"load_aws_managed_policies": False},
} }

View File

@ -1,5 +1,4 @@
import base64 import base64
import copy
import json import json
import os import os
import re import re
@ -31,6 +30,7 @@ from moto.iam.policy_validation import (
IAMTrustPolicyDocumentValidator, IAMTrustPolicyDocumentValidator,
) )
from moto.moto_api._internal import mock_random as random from moto.moto_api._internal import mock_random as random
from moto.settings import load_iam_aws_managed_policies
from moto.utilities.utils import md5_hash from moto.utilities.utils import md5_hash
from ..utilities.tagging_service import TaggingService from ..utilities.tagging_service import TaggingService
@ -1795,12 +1795,7 @@ def filter_items_with_path_prefix(
class IAMBackend(BaseBackend): class IAMBackend(BaseBackend):
def __init__( def __init__(self, region_name: str, account_id: str):
self,
region_name: str,
account_id: str,
aws_policies: Optional[List[ManagedPolicy]] = None,
):
super().__init__(region_name=region_name, account_id=account_id) super().__init__(region_name=region_name, account_id=account_id)
self.instance_profiles: Dict[str, InstanceProfile] = {} self.instance_profiles: Dict[str, InstanceProfile] = {}
self.roles: Dict[str, Role] = {} self.roles: Dict[str, Role] = {}
@ -1808,7 +1803,7 @@ class IAMBackend(BaseBackend):
self.groups: Dict[str, Group] = {} self.groups: Dict[str, Group] = {}
self.users: Dict[str, User] = {} self.users: Dict[str, User] = {}
self.credential_report: Optional[bool] = None self.credential_report: Optional[bool] = None
self.aws_managed_policies = aws_policies or self._init_aws_policies() self.aws_managed_policies = self._init_aws_policies()
self.managed_policies = self._init_managed_policies() self.managed_policies = self._init_managed_policies()
self.account_aliases: List[str] = [] self.account_aliases: List[str] = []
self.saml_providers: Dict[str, SAMLProvider] = {} self.saml_providers: Dict[str, SAMLProvider] = {}
@ -1825,6 +1820,8 @@ class IAMBackend(BaseBackend):
self.initialize_service_roles() self.initialize_service_roles()
def _init_aws_policies(self) -> List[ManagedPolicy]: def _init_aws_policies(self) -> List[ManagedPolicy]:
if not load_iam_aws_managed_policies():
return []
# AWS defines some of its own managed policies # AWS defines some of its own managed policies
# we periodically import them via `make aws_managed_policies` # we periodically import them via `make aws_managed_policies`
aws_managed_policies_data_parsed = json.loads(aws_managed_policies_data) aws_managed_policies_data_parsed = json.loads(aws_managed_policies_data)
@ -1834,15 +1831,7 @@ class IAMBackend(BaseBackend):
] ]
def _init_managed_policies(self) -> Dict[str, ManagedPolicy]: def _init_managed_policies(self) -> Dict[str, ManagedPolicy]:
return dict((p.arn, copy.deepcopy(p)) for p in self.aws_managed_policies) return dict((p.arn, p) for p in self.aws_managed_policies)
def reset(self) -> None:
region_name = self.region_name
account_id = self.account_id
# Do not reset these policies, as they take a long time to load
aws_policies = self.aws_managed_policies
self.__dict__ = {}
IAMBackend.__init__(self, region_name, account_id, aws_policies)
def initialize_service_roles(self) -> None: def initialize_service_roles(self) -> None:
pass pass

View File

@ -4,6 +4,8 @@ import pathlib
from functools import lru_cache from functools import lru_cache
from typing import List, Optional from typing import List, Optional
from moto.core.config import default_user_config
def test_proxy_mode() -> bool: def test_proxy_mode() -> bool:
return os.environ.get("TEST_PROXY_MODE", "0").lower() == "true" return os.environ.get("TEST_PROXY_MODE", "0").lower() == "true"
@ -177,3 +179,11 @@ def get_cognito_idp_user_pool_id_strategy() -> Optional[str]:
def enable_iso_regions() -> bool: def enable_iso_regions() -> bool:
return os.environ.get("MOTO_ENABLE_ISO_REGIONS", "false").lower() == "true" return os.environ.get("MOTO_ENABLE_ISO_REGIONS", "false").lower() == "true"
def load_iam_aws_managed_policies() -> bool:
return (
default_user_config.get("iam", {}).get("load_aws_managed_policies", False)
is True
or os.environ.get("MOTO_IAM_LOAD_MANAGED_POLICIES", "").lower() == "true"
)

View File

@ -706,8 +706,10 @@ def test_get_policy():
assert policy["Policy"]["Arn"] == f"arn:aws:iam::{ACCOUNT_ID}:policy/TestGetPolicy" assert policy["Policy"]["Arn"] == f"arn:aws:iam::{ACCOUNT_ID}:policy/TestGetPolicy"
@mock_aws @mock_aws(config={"iam": {"load_aws_managed_policies": True}})
def test_get_aws_managed_policy(): def test_get_aws_managed_policy():
if settings.TEST_SERVER_MODE:
raise SkipTest("Policies not loaded in ServerMode")
conn = boto3.client("iam", region_name="us-east-1") conn = boto3.client("iam", region_name="us-east-1")
managed_policy_arn = "arn:aws:iam::aws:policy/IAMUserChangePassword" managed_policy_arn = "arn:aws:iam::aws:policy/IAMUserChangePassword"
managed_policy_create_date = datetime.strptime( managed_policy_create_date = datetime.strptime(
@ -742,8 +744,10 @@ def test_get_policy_version():
assert retrieved.get("PolicyVersion")["IsDefaultVersion"] is False assert retrieved.get("PolicyVersion")["IsDefaultVersion"] is False
@mock_aws @mock_aws(config={"iam": {"load_aws_managed_policies": True}})
def test_get_aws_managed_policy_version(): def test_get_aws_managed_policy_version():
if settings.TEST_SERVER_MODE:
raise SkipTest("Policies not loaded in ServerMode")
conn = boto3.client("iam", region_name="us-east-1") conn = boto3.client("iam", region_name="us-east-1")
managed_policy_arn = ( managed_policy_arn = (
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
@ -763,8 +767,10 @@ def test_get_aws_managed_policy_version():
assert isinstance(retrieved["PolicyVersion"]["Document"], dict) assert isinstance(retrieved["PolicyVersion"]["Document"], dict)
@mock_aws @mock_aws(config={"iam": {"load_aws_managed_policies": True}})
def test_get_aws_managed_policy_v6_version(): def test_get_aws_managed_policy_v6_version():
if settings.TEST_SERVER_MODE:
raise SkipTest("Policies not loaded in ServerMode")
conn = boto3.client("iam", region_name="us-east-1") conn = boto3.client("iam", region_name="us-east-1")
managed_policy_arn = "arn:aws:iam::aws:policy/job-function/SystemAdministrator" managed_policy_arn = "arn:aws:iam::aws:policy/job-function/SystemAdministrator"
with pytest.raises(ClientError): with pytest.raises(ClientError):
@ -1953,8 +1959,10 @@ def test_get_access_key_last_used_when_used():
assert resp["AccessKeyLastUsed"]["Region"] == "us-east-1" assert resp["AccessKeyLastUsed"]["Region"] == "us-east-1"
@mock_aws @mock_aws(config={"iam": {"load_aws_managed_policies": True}})
def test_managed_policy(): def test_managed_policy():
if settings.TEST_SERVER_MODE:
raise SkipTest("Policies not loaded in ServerMode")
conn = boto3.client("iam", region_name="us-west-1") conn = boto3.client("iam", region_name="us-west-1")
conn.create_policy( conn.create_policy(
@ -2364,8 +2372,10 @@ def test_delete_ssh_public_key():
assert len(resp["SSHPublicKeys"]) == 0 assert len(resp["SSHPublicKeys"]) == 0
@mock_aws @mock_aws(config={"iam": {"load_aws_managed_policies": True}})
def test_get_account_authorization_details(): def test_get_account_authorization_details():
if settings.TEST_SERVER_MODE:
raise SkipTest("Policies not loaded in ServerMode")
test_policy = json.dumps( test_policy = json.dumps(
{ {
"Version": "2012-10-17", "Version": "2012-10-17",

View File

@ -1,5 +1,6 @@
import json import json
from datetime import datetime from datetime import datetime
from unittest import SkipTest
import boto3 import boto3
import pytest import pytest
@ -213,8 +214,10 @@ def test_put_group_policy():
) )
@mock_aws @mock_aws(config={"iam": {"load_aws_managed_policies": True}})
def test_attach_group_policies(): def test_attach_group_policies():
if settings.TEST_SERVER_MODE:
raise SkipTest("Policies not loaded in ServerMode")
conn = boto3.client("iam", region_name="us-east-1") conn = boto3.client("iam", region_name="us-east-1")
conn.create_group(GroupName="my-group") conn.create_group(GroupName="my-group")
assert ( assert (

View File

@ -1,13 +1,17 @@
import json import json
import os
from unittest import SkipTest, mock
import boto3 import boto3
from moto import mock_aws from moto import mock_aws, settings
# Test IAM User Inline Policy # Test IAM User Inline Policy
def test_policies_are_not_kept_after_mock_ends(): def test_policies_are_not_kept_after_mock_ends():
with mock_aws(): if settings.TEST_SERVER_MODE:
raise SkipTest("Policies not loaded in ServerMode")
with mock_aws(config={"iam": {"load_aws_managed_policies": True}}):
iam_client = boto3.client("iam", "us-east-1") iam_client = boto3.client("iam", "us-east-1")
role_name = "test" role_name = "test"
assume_role_policy_document = { assume_role_policy_document = {
@ -34,6 +38,17 @@ def test_policies_are_not_kept_after_mock_ends():
assert iam_policies[0]["Arn"] == "arn:aws:iam::aws:policy/ReadOnlyAccess" assert iam_policies[0]["Arn"] == "arn:aws:iam::aws:policy/ReadOnlyAccess"
assert iam_client.list_roles()["Roles"][0]["RoleName"] == "test" assert iam_client.list_roles()["Roles"][0]["RoleName"] == "test"
with mock_aws(): with mock_aws(config={"iam": {"load_aws_managed_policies": True}}):
resp = iam_client.list_policies(Scope="AWS", OnlyAttached=True) resp = iam_client.list_policies(Scope="AWS", OnlyAttached=True)
assert len(resp["Policies"]) == 0 assert len(resp["Policies"]) == 0
def test_policies_are_loaded_when_using_env_variable():
if settings.TEST_SERVER_MODE:
raise SkipTest("EnvVar not loaded in ServerMode")
with mock.patch.dict(os.environ, {"MOTO_IAM_LOAD_MANAGED_POLICIES": "true"}):
with mock_aws():
iam_client = boto3.client("iam", "us-east-1")
iam_policies = iam_client.list_policies(Scope="AWS")["Policies"]
assert len(iam_policies) > 10

View File

@ -41,7 +41,7 @@ class TestDifferentAccountsDoesNotBreakSeeding:
instances = self.ec2_client.run_instances(MaxCount=1, MinCount=1)["Instances"] instances = self.ec2_client.run_instances(MaxCount=1, MinCount=1)["Instances"]
instance_ids = [instance["InstanceId"] for instance in instances] instance_ids = [instance["InstanceId"] for instance in instances]
assert instance_ids == ["i-0df6e943394d7fdb0"] assert instance_ids == ["i-73bd4755d05ad7853"]
def test_1(self) -> None: def test_1(self) -> None:
# Create some data in a different account (111111111111) # Create some data in a different account (111111111111)