moto/tests/test_iam/test_iam.py
Steve Pulec 6107561717
Merge pull request #3451 from bpandola/fix-3450
Fix: Return `Tags` in iam:CreateUserResponse
2020-11-15 11:25:10 -06:00

4022 lines
141 KiB
Python

from __future__ import unicode_literals
import base64
import json
import boto
import boto3
import csv
import sure # noqa
from boto.exception import BotoServerError
from botocore.exceptions import ClientError
from moto import mock_config, mock_iam, mock_iam_deprecated, settings
from moto.core import ACCOUNT_ID
from moto.iam.models import aws_managed_policies
from moto.backends import get_backend
import pytest
from datetime import datetime
from tests.helpers import requires_boto_gte
from uuid import uuid4
from six.moves.urllib import parse
MOCK_CERT = """-----BEGIN CERTIFICATE-----
MIIBpzCCARACCQCY5yOdxCTrGjANBgkqhkiG9w0BAQsFADAXMRUwEwYDVQQKDAxt
b3RvIHRlc3RpbmcwIBcNMTgxMTA1MTkwNTIwWhgPMjI5MjA4MTkxOTA1MjBaMBcx
FTATBgNVBAoMDG1vdG8gdGVzdGluZzCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
gYEA1Jn3g2h7LD3FLqdpcYNbFXCS4V4eDpuTCje9vKFcC3pi/01147X3zdfPy8Mt
ZhKxcREOwm4NXykh23P9KW7fBovpNwnbYsbPqj8Hf1ZaClrgku1arTVhEnKjx8zO
vaR/bVLCss4uE0E0VM1tJn/QGQsfthFsjuHtwx8uIWz35tUCAwEAATANBgkqhkiG
9w0BAQsFAAOBgQBWdOQ7bDc2nWkUhFjZoNIZrqjyNdjlMUndpwREVD7FQ/DuxJMj
FyDHrtlrS80dPUQWNYHw++oACDpWO01LGLPPrGmuO/7cOdojPEd852q5gd+7W9xt
8vUH+pBa6IBLbvBp+szli51V3TLSWcoyy4ceJNQU2vCkTLoFdS0RLd/7tQ==
-----END CERTIFICATE-----"""
MOCK_POLICY = """
{
"Version": "2012-10-17",
"Statement":
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::example_bucket"
}
}
"""
MOCK_POLICY_2 = """
{
"Version": "2012-10-17",
"Id": "2",
"Statement":
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::example_bucket"
}
}
"""
MOCK_POLICY_3 = """
{
"Version": "2012-10-17",
"Id": "3",
"Statement":
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::example_bucket"
}
}
"""
@mock_iam_deprecated()
def test_get_all_server_certs():
conn = boto.connect_iam()
conn.upload_server_cert("certname", "certbody", "privatekey")
certs = conn.get_all_server_certs()["list_server_certificates_response"][
"list_server_certificates_result"
]["server_certificate_metadata_list"]
certs.should.have.length_of(1)
cert1 = certs[0]
cert1.server_certificate_name.should.equal("certname")
cert1.arn.should.equal(
"arn:aws:iam::{}:server-certificate/certname".format(ACCOUNT_ID)
)
@mock_iam_deprecated()
def test_get_server_cert_doesnt_exist():
conn = boto.connect_iam()
with pytest.raises(BotoServerError):
conn.get_server_certificate("NonExistant")
@mock_iam_deprecated()
def test_get_server_cert():
conn = boto.connect_iam()
conn.upload_server_cert("certname", "certbody", "privatekey")
cert = conn.get_server_certificate("certname")
cert.server_certificate_name.should.equal("certname")
cert.arn.should.equal(
"arn:aws:iam::{}:server-certificate/certname".format(ACCOUNT_ID)
)
@mock_iam_deprecated()
def test_upload_server_cert():
conn = boto.connect_iam()
conn.upload_server_cert("certname", "certbody", "privatekey")
cert = conn.get_server_certificate("certname")
cert.server_certificate_name.should.equal("certname")
cert.arn.should.equal(
"arn:aws:iam::{}:server-certificate/certname".format(ACCOUNT_ID)
)
@mock_iam_deprecated()
def test_delete_server_cert():
conn = boto.connect_iam()
conn.upload_server_cert("certname", "certbody", "privatekey")
conn.get_server_certificate("certname")
conn.delete_server_cert("certname")
with pytest.raises(BotoServerError):
conn.get_server_certificate("certname")
with pytest.raises(BotoServerError):
conn.delete_server_cert("certname")
@mock_iam_deprecated()
@pytest.mark.xfail(raises=BotoServerError)
def test_get_role__should_throw__when_role_does_not_exist():
conn = boto.connect_iam()
conn.get_role("unexisting_role")
@mock_iam_deprecated()
@pytest.mark.xfail(raises=BotoServerError)
def test_get_instance_profile__should_throw__when_instance_profile_does_not_exist():
conn = boto.connect_iam()
conn.get_instance_profile("unexisting_instance_profile")
@mock_iam_deprecated()
def test_create_role_and_instance_profile():
conn = boto.connect_iam()
conn.create_instance_profile("my-profile", path="my-path")
conn.create_role(
"my-role", assume_role_policy_document="some policy", path="/my-path/"
)
conn.add_role_to_instance_profile("my-profile", "my-role")
role = conn.get_role("my-role")
role.path.should.equal("/my-path/")
role.assume_role_policy_document.should.equal("some policy")
profile = conn.get_instance_profile("my-profile")
profile.path.should.equal("my-path")
role_from_profile = list(profile.roles.values())[0]
role_from_profile["role_id"].should.equal(role.role_id)
role_from_profile["role_name"].should.equal("my-role")
conn.list_roles().roles[0].role_name.should.equal("my-role")
# Test with an empty path:
profile = conn.create_instance_profile("my-other-profile")
profile.path.should.equal("/")
@mock_iam
def test_create_instance_profile_should_throw_when_name_is_not_unique():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_instance_profile(InstanceProfileName="unique-instance-profile")
with pytest.raises(ClientError):
conn.create_instance_profile(InstanceProfileName="unique-instance-profile")
@mock_iam_deprecated()
def test_remove_role_from_instance_profile():
conn = boto.connect_iam()
conn.create_instance_profile("my-profile", path="my-path")
conn.create_role(
"my-role", assume_role_policy_document="some policy", path="my-path"
)
conn.add_role_to_instance_profile("my-profile", "my-role")
profile = conn.get_instance_profile("my-profile")
role_from_profile = list(profile.roles.values())[0]
role_from_profile["role_name"].should.equal("my-role")
conn.remove_role_from_instance_profile("my-profile", "my-role")
profile = conn.get_instance_profile("my-profile")
dict(profile.roles).should.be.empty
@mock_iam()
def test_delete_instance_profile():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
conn.create_instance_profile(InstanceProfileName="my-profile")
conn.add_role_to_instance_profile(
InstanceProfileName="my-profile", RoleName="my-role"
)
with pytest.raises(conn.exceptions.DeleteConflictException):
conn.delete_instance_profile(InstanceProfileName="my-profile")
conn.remove_role_from_instance_profile(
InstanceProfileName="my-profile", RoleName="my-role"
)
conn.delete_instance_profile(InstanceProfileName="my-profile")
with pytest.raises(conn.exceptions.NoSuchEntityException):
profile = conn.get_instance_profile(InstanceProfileName="my-profile")
@mock_iam()
def test_get_login_profile():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="my-user")
conn.create_login_profile(UserName="my-user", Password="my-pass")
response = conn.get_login_profile(UserName="my-user")
response["LoginProfile"]["UserName"].should.equal("my-user")
@mock_iam()
def test_update_login_profile():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="my-user")
conn.create_login_profile(UserName="my-user", Password="my-pass")
response = conn.get_login_profile(UserName="my-user")
response["LoginProfile"].get("PasswordResetRequired").should.equal(None)
conn.update_login_profile(
UserName="my-user", Password="new-pass", PasswordResetRequired=True
)
response = conn.get_login_profile(UserName="my-user")
response["LoginProfile"].get("PasswordResetRequired").should.equal(True)
@mock_iam()
def test_delete_role():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.delete_role(RoleName="my-role")
# Test deletion failure with a managed policy
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
response = conn.create_policy(
PolicyName="my-managed-policy", PolicyDocument=MOCK_POLICY
)
conn.attach_role_policy(PolicyArn=response["Policy"]["Arn"], RoleName="my-role")
with pytest.raises(conn.exceptions.DeleteConflictException):
conn.delete_role(RoleName="my-role")
conn.detach_role_policy(PolicyArn=response["Policy"]["Arn"], RoleName="my-role")
conn.delete_policy(PolicyArn=response["Policy"]["Arn"])
conn.delete_role(RoleName="my-role")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_role(RoleName="my-role")
# Test deletion failure with an inline policy
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
conn.put_role_policy(
RoleName="my-role", PolicyName="my-role-policy", PolicyDocument=MOCK_POLICY,
)
with pytest.raises(conn.exceptions.DeleteConflictException):
conn.delete_role(RoleName="my-role")
conn.delete_role_policy(RoleName="my-role", PolicyName="my-role-policy")
conn.delete_role(RoleName="my-role")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_role(RoleName="my-role")
# Test deletion failure with attachment to an instance profile
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
conn.create_instance_profile(InstanceProfileName="my-profile")
conn.add_role_to_instance_profile(
InstanceProfileName="my-profile", RoleName="my-role"
)
with pytest.raises(conn.exceptions.DeleteConflictException):
conn.delete_role(RoleName="my-role")
conn.remove_role_from_instance_profile(
InstanceProfileName="my-profile", RoleName="my-role"
)
conn.delete_role(RoleName="my-role")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_role(RoleName="my-role")
# Test deletion with no conflicts
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
conn.delete_role(RoleName="my-role")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_role(RoleName="my-role")
@mock_iam_deprecated()
def test_list_instance_profiles():
conn = boto.connect_iam()
conn.create_instance_profile("my-profile", path="my-path")
conn.create_role("my-role", path="my-path")
conn.add_role_to_instance_profile("my-profile", "my-role")
profiles = conn.list_instance_profiles().instance_profiles
len(profiles).should.equal(1)
profiles[0].instance_profile_name.should.equal("my-profile")
profiles[0].roles.role_name.should.equal("my-role")
@mock_iam_deprecated()
def test_list_instance_profiles_for_role():
conn = boto.connect_iam()
conn.create_role(
role_name="my-role", assume_role_policy_document="some policy", path="my-path",
)
conn.create_role(
role_name="my-role2",
assume_role_policy_document="some policy2",
path="my-path2",
)
profile_name_list = ["my-profile", "my-profile2"]
profile_path_list = ["my-path", "my-path2"]
for profile_count in range(0, 2):
conn.create_instance_profile(
profile_name_list[profile_count], path=profile_path_list[profile_count],
)
for profile_count in range(0, 2):
conn.add_role_to_instance_profile(profile_name_list[profile_count], "my-role")
profile_dump = conn.list_instance_profiles_for_role(role_name="my-role")
profile_list = profile_dump["list_instance_profiles_for_role_response"][
"list_instance_profiles_for_role_result"
]["instance_profiles"]
for profile_count in range(0, len(profile_list)):
profile_name_list.remove(profile_list[profile_count]["instance_profile_name"])
profile_path_list.remove(profile_list[profile_count]["path"])
profile_list[profile_count]["roles"]["member"]["role_name"].should.equal(
"my-role"
)
len(profile_name_list).should.equal(0)
len(profile_path_list).should.equal(0)
profile_dump2 = conn.list_instance_profiles_for_role(role_name="my-role2")
profile_list = profile_dump2["list_instance_profiles_for_role_response"][
"list_instance_profiles_for_role_result"
]["instance_profiles"]
len(profile_list).should.equal(0)
@mock_iam_deprecated()
def test_list_role_policies():
conn = boto.connect_iam()
conn.create_role("my-role")
conn.put_role_policy("my-role", "test policy", MOCK_POLICY)
role = conn.list_role_policies("my-role")
role.policy_names.should.have.length_of(1)
role.policy_names[0].should.equal("test policy")
conn.put_role_policy("my-role", "test policy 2", MOCK_POLICY)
role = conn.list_role_policies("my-role")
role.policy_names.should.have.length_of(2)
conn.delete_role_policy("my-role", "test policy")
role = conn.list_role_policies("my-role")
role.policy_names.should.have.length_of(1)
role.policy_names[0].should.equal("test policy 2")
with pytest.raises(BotoServerError):
conn.delete_role_policy("my-role", "test policy")
@mock_iam_deprecated()
def test_put_role_policy():
conn = boto.connect_iam()
conn.create_role(
"my-role", assume_role_policy_document="some policy", path="my-path"
)
conn.put_role_policy("my-role", "test policy", MOCK_POLICY)
policy = conn.get_role_policy("my-role", "test policy")["get_role_policy_response"][
"get_role_policy_result"
]["policy_name"]
policy.should.equal("test policy")
@mock_iam
def test_get_role_policy():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="my-path",
)
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_role_policy(RoleName="my-role", PolicyName="does-not-exist")
@mock_iam_deprecated()
def test_update_assume_role_policy():
conn = boto.connect_iam()
role = conn.create_role("my-role")
conn.update_assume_role_policy(role.role_name, "my-policy")
role = conn.get_role("my-role")
role.assume_role_policy_document.should.equal("my-policy")
@mock_iam
def test_create_policy():
conn = boto3.client("iam", region_name="us-east-1")
response = conn.create_policy(
PolicyName="TestCreatePolicy", PolicyDocument=MOCK_POLICY
)
response["Policy"]["Arn"].should.equal(
"arn:aws:iam::{}:policy/TestCreatePolicy".format(ACCOUNT_ID)
)
@mock_iam
def test_create_policy_already_exists():
conn = boto3.client("iam", region_name="us-east-1")
response = conn.create_policy(
PolicyName="TestCreatePolicy", PolicyDocument=MOCK_POLICY
)
with pytest.raises(conn.exceptions.EntityAlreadyExistsException) as ex:
response = conn.create_policy(
PolicyName="TestCreatePolicy", PolicyDocument=MOCK_POLICY
)
ex.value.response["Error"]["Code"].should.equal("EntityAlreadyExists")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(409)
ex.value.response["Error"]["Message"].should.contain("TestCreatePolicy")
@mock_iam
def test_delete_policy():
conn = boto3.client("iam", region_name="us-east-1")
response = conn.create_policy(
PolicyName="TestCreatePolicy", PolicyDocument=MOCK_POLICY
)
[
pol["PolicyName"] for pol in conn.list_policies(Scope="Local")["Policies"]
].should.equal(["TestCreatePolicy"])
conn.delete_policy(PolicyArn=response["Policy"]["Arn"])
assert conn.list_policies(Scope="Local")["Policies"].should.be.empty
@mock_iam
def test_create_policy_versions():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestCreatePolicyVersion".format(
ACCOUNT_ID
),
PolicyDocument='{"some":"policy"}',
)
conn.create_policy(PolicyName="TestCreatePolicyVersion", PolicyDocument=MOCK_POLICY)
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestCreatePolicyVersion".format(ACCOUNT_ID),
PolicyDocument=MOCK_POLICY,
SetAsDefault=True,
)
version.get("PolicyVersion").get("Document").should.equal(json.loads(MOCK_POLICY))
version.get("PolicyVersion").get("VersionId").should.equal("v2")
version.get("PolicyVersion").get("IsDefaultVersion").should.be.ok
conn.delete_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestCreatePolicyVersion".format(ACCOUNT_ID),
VersionId="v1",
)
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestCreatePolicyVersion".format(ACCOUNT_ID),
PolicyDocument=MOCK_POLICY,
)
version.get("PolicyVersion").get("VersionId").should.equal("v3")
version.get("PolicyVersion").get("IsDefaultVersion").shouldnt.be.ok
@mock_iam
def test_create_many_policy_versions():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_policy(
PolicyName="TestCreateManyPolicyVersions", PolicyDocument=MOCK_POLICY
)
for _ in range(0, 4):
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestCreateManyPolicyVersions".format(
ACCOUNT_ID
),
PolicyDocument=MOCK_POLICY,
)
with pytest.raises(ClientError):
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestCreateManyPolicyVersions".format(
ACCOUNT_ID
),
PolicyDocument=MOCK_POLICY,
)
@mock_iam
def test_set_default_policy_version():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_policy(
PolicyName="TestSetDefaultPolicyVersion", PolicyDocument=MOCK_POLICY
)
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion".format(
ACCOUNT_ID
),
PolicyDocument=MOCK_POLICY_2,
SetAsDefault=True,
)
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion".format(
ACCOUNT_ID
),
PolicyDocument=MOCK_POLICY_3,
SetAsDefault=True,
)
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion".format(
ACCOUNT_ID
)
)
versions.get("Versions")[0].get("Document").should.equal(json.loads(MOCK_POLICY))
versions.get("Versions")[0].get("IsDefaultVersion").shouldnt.be.ok
versions.get("Versions")[1].get("Document").should.equal(json.loads(MOCK_POLICY_2))
versions.get("Versions")[1].get("IsDefaultVersion").shouldnt.be.ok
versions.get("Versions")[2].get("Document").should.equal(json.loads(MOCK_POLICY_3))
versions.get("Versions")[2].get("IsDefaultVersion").should.be.ok
conn.set_default_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion".format(
ACCOUNT_ID
),
VersionId="v1",
)
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion".format(
ACCOUNT_ID
)
)
versions.get("Versions")[0].get("Document").should.equal(json.loads(MOCK_POLICY))
versions.get("Versions")[0].get("IsDefaultVersion").should.be.ok
versions.get("Versions")[1].get("Document").should.equal(json.loads(MOCK_POLICY_2))
versions.get("Versions")[1].get("IsDefaultVersion").shouldnt.be.ok
versions.get("Versions")[2].get("Document").should.equal(json.loads(MOCK_POLICY_3))
versions.get("Versions")[2].get("IsDefaultVersion").shouldnt.be.ok
# Set default version for non-existing policy
conn.set_default_policy_version.when.called_with(
PolicyArn="arn:aws:iam::{}:policy/TestNonExistingPolicy".format(ACCOUNT_ID),
VersionId="v1",
).should.throw(
ClientError,
"Policy arn:aws:iam::{}:policy/TestNonExistingPolicy not found".format(
ACCOUNT_ID
),
)
# Set default version for incorrect version
conn.set_default_policy_version.when.called_with(
PolicyArn="arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion".format(
ACCOUNT_ID
),
VersionId="wrong_version_id",
).should.throw(
ClientError,
"Value 'wrong_version_id' at 'versionId' failed to satisfy constraint: Member must satisfy regular expression pattern: v[1-9][0-9]*(\.[A-Za-z0-9-]*)?",
)
# Set default version for non-existing version
conn.set_default_policy_version.when.called_with(
PolicyArn="arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion".format(
ACCOUNT_ID
),
VersionId="v4",
).should.throw(
ClientError,
"Policy arn:aws:iam::{}:policy/TestSetDefaultPolicyVersion version v4 does not exist or is not attachable.".format(
ACCOUNT_ID
),
)
@mock_iam
def test_get_policy():
conn = boto3.client("iam", region_name="us-east-1")
response = conn.create_policy(
PolicyName="TestGetPolicy", PolicyDocument=MOCK_POLICY
)
policy = conn.get_policy(
PolicyArn="arn:aws:iam::{}:policy/TestGetPolicy".format(ACCOUNT_ID)
)
policy["Policy"]["Arn"].should.equal(
"arn:aws:iam::{}:policy/TestGetPolicy".format(ACCOUNT_ID)
)
@mock_iam
def test_get_aws_managed_policy():
conn = boto3.client("iam", region_name="us-east-1")
managed_policy_arn = "arn:aws:iam::aws:policy/IAMUserChangePassword"
managed_policy_create_date = datetime.strptime(
"2016-11-15T00:25:16+00:00", "%Y-%m-%dT%H:%M:%S+00:00"
)
policy = conn.get_policy(PolicyArn=managed_policy_arn)
policy["Policy"]["Arn"].should.equal(managed_policy_arn)
policy["Policy"]["CreateDate"].replace(tzinfo=None).should.equal(
managed_policy_create_date
)
@mock_iam
def test_get_policy_version():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_policy(PolicyName="TestGetPolicyVersion", PolicyDocument=MOCK_POLICY)
version = conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestGetPolicyVersion".format(ACCOUNT_ID),
PolicyDocument=MOCK_POLICY,
)
with pytest.raises(ClientError):
conn.get_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestGetPolicyVersion".format(ACCOUNT_ID),
VersionId="v2-does-not-exist",
)
retrieved = conn.get_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestGetPolicyVersion".format(ACCOUNT_ID),
VersionId=version.get("PolicyVersion").get("VersionId"),
)
retrieved.get("PolicyVersion").get("Document").should.equal(json.loads(MOCK_POLICY))
retrieved.get("PolicyVersion").get("IsDefaultVersion").shouldnt.be.ok
@mock_iam
def test_get_aws_managed_policy_version():
conn = boto3.client("iam", region_name="us-east-1")
managed_policy_arn = (
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
)
managed_policy_version_create_date = datetime.strptime(
"2015-04-09T15:03:43+00:00", "%Y-%m-%dT%H:%M:%S+00:00"
)
with pytest.raises(ClientError):
conn.get_policy_version(
PolicyArn=managed_policy_arn, VersionId="v2-does-not-exist"
)
retrieved = conn.get_policy_version(PolicyArn=managed_policy_arn, VersionId="v1")
retrieved["PolicyVersion"]["CreateDate"].replace(tzinfo=None).should.equal(
managed_policy_version_create_date
)
retrieved["PolicyVersion"]["Document"].should.be.an(dict)
@mock_iam
def test_get_aws_managed_policy_v4_version():
conn = boto3.client("iam", region_name="us-east-1")
managed_policy_arn = "arn:aws:iam::aws:policy/job-function/SystemAdministrator"
managed_policy_version_create_date = datetime.strptime(
"2018-10-08T21:33:45+00:00", "%Y-%m-%dT%H:%M:%S+00:00"
)
with pytest.raises(ClientError):
conn.get_policy_version(
PolicyArn=managed_policy_arn, VersionId="v2-does-not-exist"
)
retrieved = conn.get_policy_version(PolicyArn=managed_policy_arn, VersionId="v4")
retrieved["PolicyVersion"]["CreateDate"].replace(tzinfo=None).should.equal(
managed_policy_version_create_date
)
retrieved["PolicyVersion"]["Document"].should.be.an(dict)
@mock_iam
def test_list_policy_versions():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::{}:policy/TestListPolicyVersions".format(ACCOUNT_ID)
)
conn.create_policy(PolicyName="TestListPolicyVersions", PolicyDocument=MOCK_POLICY)
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::{}:policy/TestListPolicyVersions".format(ACCOUNT_ID)
)
versions.get("Versions")[0].get("VersionId").should.equal("v1")
versions.get("Versions")[0].get("IsDefaultVersion").should.be.ok
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestListPolicyVersions".format(ACCOUNT_ID),
PolicyDocument=MOCK_POLICY_2,
)
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestListPolicyVersions".format(ACCOUNT_ID),
PolicyDocument=MOCK_POLICY_3,
)
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::{}:policy/TestListPolicyVersions".format(ACCOUNT_ID)
)
versions.get("Versions")[1].get("Document").should.equal(json.loads(MOCK_POLICY_2))
versions.get("Versions")[1].get("IsDefaultVersion").shouldnt.be.ok
versions.get("Versions")[2].get("Document").should.equal(json.loads(MOCK_POLICY_3))
versions.get("Versions")[2].get("IsDefaultVersion").shouldnt.be.ok
@mock_iam
def test_delete_policy_version():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_policy(PolicyName="TestDeletePolicyVersion", PolicyDocument=MOCK_POLICY)
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestDeletePolicyVersion".format(ACCOUNT_ID),
PolicyDocument=MOCK_POLICY,
)
with pytest.raises(ClientError):
conn.delete_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestDeletePolicyVersion".format(
ACCOUNT_ID
),
VersionId="v2-nope-this-does-not-exist",
)
conn.delete_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestDeletePolicyVersion".format(ACCOUNT_ID),
VersionId="v2",
)
versions = conn.list_policy_versions(
PolicyArn="arn:aws:iam::{}:policy/TestDeletePolicyVersion".format(ACCOUNT_ID)
)
len(versions.get("Versions")).should.equal(1)
@mock_iam
def test_delete_default_policy_version():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_policy(PolicyName="TestDeletePolicyVersion", PolicyDocument=MOCK_POLICY)
conn.create_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestDeletePolicyVersion".format(ACCOUNT_ID),
PolicyDocument=MOCK_POLICY_2,
)
with pytest.raises(ClientError):
conn.delete_policy_version(
PolicyArn="arn:aws:iam::{}:policy/TestDeletePolicyVersion".format(
ACCOUNT_ID
),
VersionId="v1",
)
@mock_iam_deprecated()
def test_create_user():
conn = boto.connect_iam()
conn.create_user("my-user")
with pytest.raises(BotoServerError):
conn.create_user("my-user")
@mock_iam_deprecated()
def test_get_user():
conn = boto.connect_iam()
with pytest.raises(BotoServerError):
conn.get_user("my-user")
conn.create_user("my-user")
conn.get_user("my-user")
@mock_iam()
def test_update_user():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.update_user(UserName="my-user")
conn.create_user(UserName="my-user")
conn.update_user(UserName="my-user", NewPath="/new-path/", NewUserName="new-user")
response = conn.get_user(UserName="new-user")
response["User"].get("Path").should.equal("/new-path/")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_user(UserName="my-user")
@mock_iam_deprecated()
def test_get_current_user():
"""If no user is specific, IAM returns the current user"""
conn = boto.connect_iam()
user = conn.get_user()["get_user_response"]["get_user_result"]["user"]
user["user_name"].should.equal("default_user")
@mock_iam()
def test_list_users():
path_prefix = "/"
max_items = 10
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="my-user")
response = conn.list_users(PathPrefix=path_prefix, MaxItems=max_items)
user = response["Users"][0]
user["UserName"].should.equal("my-user")
user["Path"].should.equal("/")
user["Arn"].should.equal("arn:aws:iam::{}:user/my-user".format(ACCOUNT_ID))
conn.create_user(UserName="my-user-1", Path="myUser")
response = conn.list_users(PathPrefix="my")
user = response["Users"][0]
user["UserName"].should.equal("my-user-1")
user["Path"].should.equal("myUser")
@mock_iam()
def test_user_policies():
policy_name = "UserManagedPolicy"
user_name = "my-user"
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName=user_name)
conn.put_user_policy(
UserName=user_name, PolicyName=policy_name, PolicyDocument=MOCK_POLICY
)
policy_doc = conn.get_user_policy(UserName=user_name, PolicyName=policy_name)
policy_doc["PolicyDocument"].should.equal(json.loads(MOCK_POLICY))
policies = conn.list_user_policies(UserName=user_name)
len(policies["PolicyNames"]).should.equal(1)
policies["PolicyNames"][0].should.equal(policy_name)
conn.delete_user_policy(UserName=user_name, PolicyName=policy_name)
policies = conn.list_user_policies(UserName=user_name)
len(policies["PolicyNames"]).should.equal(0)
@mock_iam_deprecated()
def test_create_login_profile():
conn = boto.connect_iam()
with pytest.raises(BotoServerError):
conn.create_login_profile("my-user", "my-pass")
conn.create_user("my-user")
conn.create_login_profile("my-user", "my-pass")
with pytest.raises(BotoServerError):
conn.create_login_profile("my-user", "my-pass")
@mock_iam_deprecated()
def test_delete_login_profile():
conn = boto.connect_iam()
conn.create_user("my-user")
with pytest.raises(BotoServerError):
conn.delete_login_profile("my-user")
conn.create_login_profile("my-user", "my-pass")
conn.delete_login_profile("my-user")
@mock_iam
def test_create_access_key():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
conn.create_access_key(UserName="my-user")
conn.create_user(UserName="my-user")
access_key = conn.create_access_key(UserName="my-user")["AccessKey"]
(
datetime.utcnow() - access_key["CreateDate"].replace(tzinfo=None)
).seconds.should.be.within(0, 10)
access_key["AccessKeyId"].should.have.length_of(20)
access_key["SecretAccessKey"].should.have.length_of(40)
assert access_key["AccessKeyId"].startswith("AKIA")
conn = boto3.client(
"iam",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
access_key = conn.create_access_key()["AccessKey"]
(
datetime.utcnow() - access_key["CreateDate"].replace(tzinfo=None)
).seconds.should.be.within(0, 10)
access_key["AccessKeyId"].should.have.length_of(20)
access_key["SecretAccessKey"].should.have.length_of(40)
assert access_key["AccessKeyId"].startswith("AKIA")
@mock_iam_deprecated()
def test_get_all_access_keys():
"""If no access keys exist there should be none in the response,
if an access key is present it should have the correct fields present"""
conn = boto.connect_iam()
conn.create_user("my-user")
response = conn.get_all_access_keys("my-user")
assert (
response["list_access_keys_response"]["list_access_keys_result"][
"access_key_metadata"
]
== []
)
conn.create_access_key("my-user")
response = conn.get_all_access_keys("my-user")
assert sorted(
response["list_access_keys_response"]["list_access_keys_result"][
"access_key_metadata"
][0].keys()
) == sorted(["status", "create_date", "user_name", "access_key_id"])
@mock_iam
def test_list_access_keys():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="my-user")
response = conn.list_access_keys(UserName="my-user")
assert response["AccessKeyMetadata"] == []
access_key = conn.create_access_key(UserName="my-user")["AccessKey"]
response = conn.list_access_keys(UserName="my-user")
assert sorted(response["AccessKeyMetadata"][0].keys()) == sorted(
["Status", "CreateDate", "UserName", "AccessKeyId"]
)
conn = boto3.client(
"iam",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
response = conn.list_access_keys()
assert sorted(response["AccessKeyMetadata"][0].keys()) == sorted(
["Status", "CreateDate", "UserName", "AccessKeyId"]
)
@mock_iam_deprecated()
def test_delete_access_key_deprecated():
conn = boto.connect_iam()
conn.create_user("my-user")
access_key_id = conn.create_access_key("my-user")["create_access_key_response"][
"create_access_key_result"
]["access_key"]["access_key_id"]
conn.delete_access_key(access_key_id, "my-user")
@mock_iam
def test_delete_access_key():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="my-user")
key = conn.create_access_key(UserName="my-user")["AccessKey"]
conn.delete_access_key(AccessKeyId=key["AccessKeyId"], UserName="my-user")
key = conn.create_access_key(UserName="my-user")["AccessKey"]
conn.delete_access_key(AccessKeyId=key["AccessKeyId"])
@mock_iam()
def test_mfa_devices():
# Test enable device
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="my-user")
conn.enable_mfa_device(
UserName="my-user",
SerialNumber="123456789",
AuthenticationCode1="234567",
AuthenticationCode2="987654",
)
# Test list mfa devices
response = conn.list_mfa_devices(UserName="my-user")
device = response["MFADevices"][0]
device["SerialNumber"].should.equal("123456789")
# Test deactivate mfa device
conn.deactivate_mfa_device(UserName="my-user", SerialNumber="123456789")
response = conn.list_mfa_devices(UserName="my-user")
len(response["MFADevices"]).should.equal(0)
@mock_iam
def test_create_virtual_mfa_device():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
device = response["VirtualMFADevice"]
device["SerialNumber"].should.equal(
"arn:aws:iam::{}:mfa/test-device".format(ACCOUNT_ID)
)
device["Base32StringSeed"].decode("ascii").should.match("[A-Z234567]")
device["QRCodePNG"].should_not.be.empty
response = client.create_virtual_mfa_device(
Path="/", VirtualMFADeviceName="test-device-2"
)
device = response["VirtualMFADevice"]
device["SerialNumber"].should.equal(
"arn:aws:iam::{}:mfa/test-device-2".format(ACCOUNT_ID)
)
device["Base32StringSeed"].decode("ascii").should.match("[A-Z234567]")
device["QRCodePNG"].should_not.be.empty
response = client.create_virtual_mfa_device(
Path="/test/", VirtualMFADeviceName="test-device"
)
device = response["VirtualMFADevice"]
device["SerialNumber"].should.equal(
"arn:aws:iam::{}:mfa/test/test-device".format(ACCOUNT_ID)
)
device["Base32StringSeed"].decode("ascii").should.match("[A-Z234567]")
device["QRCodePNG"].should_not.be.empty
@mock_iam
def test_create_virtual_mfa_device_errors():
client = boto3.client("iam", region_name="us-east-1")
client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
client.create_virtual_mfa_device.when.called_with(
VirtualMFADeviceName="test-device"
).should.throw(
ClientError, "MFADevice entity at the same path and name already exists.",
)
client.create_virtual_mfa_device.when.called_with(
Path="test", VirtualMFADeviceName="test-device"
).should.throw(
ClientError,
"The specified value for path is invalid. "
"It must begin and end with / and contain only alphanumeric characters and/or / characters.",
)
client.create_virtual_mfa_device.when.called_with(
Path="/test//test/", VirtualMFADeviceName="test-device"
).should.throw(
ClientError,
"The specified value for path is invalid. "
"It must begin and end with / and contain only alphanumeric characters and/or / characters.",
)
too_long_path = "/{}/".format("b" * 511)
client.create_virtual_mfa_device.when.called_with(
Path=too_long_path, VirtualMFADeviceName="test-device"
).should.throw(
ClientError,
"1 validation error detected: "
'Value "{}" at "path" failed to satisfy constraint: '
"Member must have length less than or equal to 512",
)
@mock_iam
def test_delete_virtual_mfa_device():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
serial_number = response["VirtualMFADevice"]["SerialNumber"]
client.delete_virtual_mfa_device(SerialNumber=serial_number)
response = client.list_virtual_mfa_devices()
response["VirtualMFADevices"].should.have.length_of(0)
response["IsTruncated"].should_not.be.ok
@mock_iam
def test_delete_virtual_mfa_device_errors():
client = boto3.client("iam", region_name="us-east-1")
serial_number = "arn:aws:iam::{}:mfa/not-existing".format(ACCOUNT_ID)
client.delete_virtual_mfa_device.when.called_with(
SerialNumber=serial_number
).should.throw(
ClientError,
"VirtualMFADevice with serial number {0} doesn't exist.".format(serial_number),
)
@mock_iam
def test_list_virtual_mfa_devices():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
serial_number_1 = response["VirtualMFADevice"]["SerialNumber"]
response = client.create_virtual_mfa_device(
Path="/test/", VirtualMFADeviceName="test-device"
)
serial_number_2 = response["VirtualMFADevice"]["SerialNumber"]
response = client.list_virtual_mfa_devices()
response["VirtualMFADevices"].should.equal(
[{"SerialNumber": serial_number_1}, {"SerialNumber": serial_number_2}]
)
response["IsTruncated"].should_not.be.ok
response = client.list_virtual_mfa_devices(AssignmentStatus="Assigned")
response["VirtualMFADevices"].should.have.length_of(0)
response["IsTruncated"].should_not.be.ok
response = client.list_virtual_mfa_devices(AssignmentStatus="Unassigned")
response["VirtualMFADevices"].should.equal(
[{"SerialNumber": serial_number_1}, {"SerialNumber": serial_number_2}]
)
response["IsTruncated"].should_not.be.ok
response = client.list_virtual_mfa_devices(AssignmentStatus="Any", MaxItems=1)
response["VirtualMFADevices"].should.equal([{"SerialNumber": serial_number_1}])
response["IsTruncated"].should.be.ok
response["Marker"].should.equal("1")
response = client.list_virtual_mfa_devices(
AssignmentStatus="Any", Marker=response["Marker"]
)
response["VirtualMFADevices"].should.equal([{"SerialNumber": serial_number_2}])
response["IsTruncated"].should_not.be.ok
@mock_iam
def test_list_virtual_mfa_devices_errors():
client = boto3.client("iam", region_name="us-east-1")
client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
client.list_virtual_mfa_devices.when.called_with(Marker="100").should.throw(
ClientError, "Invalid Marker."
)
@mock_iam
def test_enable_virtual_mfa_device():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
serial_number = response["VirtualMFADevice"]["SerialNumber"]
client.create_user(UserName="test-user")
client.enable_mfa_device(
UserName="test-user",
SerialNumber=serial_number,
AuthenticationCode1="234567",
AuthenticationCode2="987654",
)
response = client.list_virtual_mfa_devices(AssignmentStatus="Unassigned")
response["VirtualMFADevices"].should.have.length_of(0)
response["IsTruncated"].should_not.be.ok
response = client.list_virtual_mfa_devices(AssignmentStatus="Assigned")
device = response["VirtualMFADevices"][0]
device["SerialNumber"].should.equal(serial_number)
device["User"]["Path"].should.equal("/")
device["User"]["UserName"].should.equal("test-user")
device["User"]["UserId"].should_not.be.empty
device["User"]["Arn"].should.equal(
"arn:aws:iam::{}:user/test-user".format(ACCOUNT_ID)
)
device["User"]["CreateDate"].should.be.a(datetime)
device["EnableDate"].should.be.a(datetime)
response["IsTruncated"].should_not.be.ok
client.deactivate_mfa_device(UserName="test-user", SerialNumber=serial_number)
response = client.list_virtual_mfa_devices(AssignmentStatus="Assigned")
response["VirtualMFADevices"].should.have.length_of(0)
response["IsTruncated"].should_not.be.ok
response = client.list_virtual_mfa_devices(AssignmentStatus="Unassigned")
response["VirtualMFADevices"].should.equal([{"SerialNumber": serial_number}])
response["IsTruncated"].should_not.be.ok
@mock_iam_deprecated()
def test_delete_user_deprecated():
conn = boto.connect_iam()
with pytest.raises(BotoServerError):
conn.delete_user("my-user")
conn.create_user("my-user")
conn.delete_user("my-user")
@mock_iam()
def test_delete_user():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.delete_user(UserName="my-user")
# Test deletion failure with a managed policy
conn.create_user(UserName="my-user")
response = conn.create_policy(
PolicyName="my-managed-policy", PolicyDocument=MOCK_POLICY
)
conn.attach_user_policy(PolicyArn=response["Policy"]["Arn"], UserName="my-user")
with pytest.raises(conn.exceptions.DeleteConflictException):
conn.delete_user(UserName="my-user")
conn.detach_user_policy(PolicyArn=response["Policy"]["Arn"], UserName="my-user")
conn.delete_policy(PolicyArn=response["Policy"]["Arn"])
conn.delete_user(UserName="my-user")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_user(UserName="my-user")
# Test deletion failure with an inline policy
conn.create_user(UserName="my-user")
conn.put_user_policy(
UserName="my-user", PolicyName="my-user-policy", PolicyDocument=MOCK_POLICY,
)
with pytest.raises(conn.exceptions.DeleteConflictException):
conn.delete_user(UserName="my-user")
conn.delete_user_policy(UserName="my-user", PolicyName="my-user-policy")
conn.delete_user(UserName="my-user")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_user(UserName="my-user")
# Test deletion with no conflicts
conn.create_user(UserName="my-user")
conn.delete_user(UserName="my-user")
with pytest.raises(conn.exceptions.NoSuchEntityException):
conn.get_user(UserName="my-user")
@mock_iam_deprecated()
def test_generate_credential_report():
conn = boto.connect_iam()
result = conn.generate_credential_report()
result["generate_credential_report_response"]["generate_credential_report_result"][
"state"
].should.equal("STARTED")
result = conn.generate_credential_report()
result["generate_credential_report_response"]["generate_credential_report_result"][
"state"
].should.equal("COMPLETE")
@mock_iam
def test_boto3_generate_credential_report():
conn = boto3.client("iam", region_name="us-east-1")
result = conn.generate_credential_report()
result["State"].should.equal("STARTED")
result = conn.generate_credential_report()
result["State"].should.equal("COMPLETE")
@mock_iam_deprecated()
def test_get_credential_report():
conn = boto.connect_iam()
conn.create_user("my-user")
with pytest.raises(BotoServerError):
conn.get_credential_report()
result = conn.generate_credential_report()
while (
result["generate_credential_report_response"][
"generate_credential_report_result"
]["state"]
!= "COMPLETE"
):
result = conn.generate_credential_report()
result = conn.get_credential_report()
report = base64.b64decode(
result["get_credential_report_response"]["get_credential_report_result"][
"content"
].encode("ascii")
).decode("ascii")
report.should.match(r".*my-user.*")
@mock_iam
def test_boto3_get_credential_report():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="my-user")
with pytest.raises(ClientError):
conn.get_credential_report()
result = conn.generate_credential_report()
while result["State"] != "COMPLETE":
result = conn.generate_credential_report()
result = conn.get_credential_report()
report = result["Content"].decode("utf-8")
report.should.match(r".*my-user.*")
@mock_iam
def test_boto3_get_credential_report_content():
conn = boto3.client("iam", region_name="us-east-1")
username = "my-user"
conn.create_user(UserName=username)
key1 = conn.create_access_key(UserName=username)["AccessKey"]
conn.update_access_key(
UserName=username, AccessKeyId=key1["AccessKeyId"], Status="Inactive"
)
key1 = conn.create_access_key(UserName=username)["AccessKey"]
timestamp = datetime.utcnow()
if not settings.TEST_SERVER_MODE:
iam_backend = get_backend("iam")["global"]
iam_backend.users[username].access_keys[1].last_used = timestamp
with pytest.raises(ClientError):
conn.get_credential_report()
result = conn.generate_credential_report()
while result["State"] != "COMPLETE":
result = conn.generate_credential_report()
result = conn.get_credential_report()
report = result["Content"].decode("utf-8")
header = report.split("\n")[0]
header.should.equal(
"user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_1_last_used_date,access_key_1_last_used_region,access_key_1_last_used_service,access_key_2_active,access_key_2_last_rotated,access_key_2_last_used_date,access_key_2_last_used_region,access_key_2_last_used_service,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated"
)
report_dict = csv.DictReader(report.split("\n"))
user = next(report_dict)
user["user"].should.equal("my-user")
user["access_key_1_active"].should.equal("false")
user["access_key_1_last_rotated"].should.match(timestamp.strftime("%Y-%m-%d"))
user["access_key_1_last_used_date"].should.equal("N/A")
user["access_key_2_active"].should.equal("true")
if not settings.TEST_SERVER_MODE:
user["access_key_2_last_used_date"].should.match(timestamp.strftime("%Y-%m-%d"))
else:
user["access_key_2_last_used_date"].should.equal("N/A")
@mock_iam
def test_get_access_key_last_used_when_used():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
with pytest.raises(ClientError):
client.get_access_key_last_used(AccessKeyId="non-existent-key-id")
create_key_response = client.create_access_key(UserName=username)["AccessKey"]
# Set last used date using the IAM backend. Moto currently does not have a mechanism for tracking usage of access keys
if not settings.TEST_SERVER_MODE:
timestamp = datetime.utcnow()
iam_backend = get_backend("iam")["global"]
iam_backend.users[username].access_keys[0].last_used = timestamp
resp = client.get_access_key_last_used(
AccessKeyId=create_key_response["AccessKeyId"]
)
if not settings.TEST_SERVER_MODE:
datetime.strftime(
resp["AccessKeyLastUsed"]["LastUsedDate"], "%Y-%m-%d"
).should.equal(timestamp.strftime("%Y-%m-%d"))
else:
resp["AccessKeyLastUsed"].should_not.contain("LastUsedDate")
@requires_boto_gte("2.39")
@mock_iam_deprecated()
def test_managed_policy():
conn = boto.connect_iam()
conn.create_policy(
policy_name="UserManagedPolicy",
policy_document=MOCK_POLICY,
path="/mypolicy/",
description="my user managed policy",
)
marker = 0
aws_policies = []
while marker is not None:
response = conn.list_policies(scope="AWS", marker=marker)[
"list_policies_response"
]["list_policies_result"]
for policy in response["policies"]:
aws_policies.append(policy)
marker = response.get("marker")
set(p.name for p in aws_managed_policies).should.equal(
set(p["policy_name"] for p in aws_policies)
)
user_policies = conn.list_policies(scope="Local")["list_policies_response"][
"list_policies_result"
]["policies"]
set(["UserManagedPolicy"]).should.equal(
set(p["policy_name"] for p in user_policies)
)
marker = 0
all_policies = []
while marker is not None:
response = conn.list_policies(marker=marker)["list_policies_response"][
"list_policies_result"
]
for policy in response["policies"]:
all_policies.append(policy)
marker = response.get("marker")
set(p["policy_name"] for p in aws_policies + user_policies).should.equal(
set(p["policy_name"] for p in all_policies)
)
role_name = "my-role"
conn.create_role(
role_name, assume_role_policy_document={"policy": "test"}, path="my-path",
)
for policy_name in [
"AmazonElasticMapReduceRole",
"AmazonElasticMapReduceforEC2Role",
]:
policy_arn = "arn:aws:iam::aws:policy/service-role/" + policy_name
conn.attach_role_policy(policy_arn, role_name)
rows = conn.list_policies(only_attached=True)["list_policies_response"][
"list_policies_result"
]["policies"]
rows.should.have.length_of(2)
for x in rows:
int(x["attachment_count"]).should.be.greater_than(0)
# boto has not implemented this end point but accessible this way
resp = conn.get_response(
"ListAttachedRolePolicies",
{"RoleName": role_name},
list_marker="AttachedPolicies",
)
resp["list_attached_role_policies_response"]["list_attached_role_policies_result"][
"attached_policies"
].should.have.length_of(2)
conn.detach_role_policy(
"arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole", role_name,
)
rows = conn.list_policies(only_attached=True)["list_policies_response"][
"list_policies_result"
]["policies"]
rows.should.have.length_of(1)
for x in rows:
int(x["attachment_count"]).should.be.greater_than(0)
# boto has not implemented this end point but accessible this way
resp = conn.get_response(
"ListAttachedRolePolicies",
{"RoleName": role_name},
list_marker="AttachedPolicies",
)
resp["list_attached_role_policies_response"]["list_attached_role_policies_result"][
"attached_policies"
].should.have.length_of(1)
with pytest.raises(BotoServerError):
conn.detach_role_policy(
"arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole",
role_name,
)
with pytest.raises(BotoServerError):
conn.detach_role_policy("arn:aws:iam::aws:policy/Nonexistent", role_name)
@mock_iam
def test_boto3_create_login_profile():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
conn.create_login_profile(UserName="my-user", Password="Password")
conn.create_user(UserName="my-user")
conn.create_login_profile(UserName="my-user", Password="Password")
with pytest.raises(ClientError):
conn.create_login_profile(UserName="my-user", Password="Password")
@mock_iam()
def test_attach_detach_user_policy():
iam = boto3.resource("iam", region_name="us-east-1")
client = boto3.client("iam", region_name="us-east-1")
user = iam.create_user(UserName="test-user")
policy_name = "UserAttachedPolicy"
policy = iam.create_policy(
PolicyName=policy_name,
PolicyDocument=MOCK_POLICY,
Path="/mypolicy/",
Description="my user attached policy",
)
client.attach_user_policy(UserName=user.name, PolicyArn=policy.arn)
resp = client.list_attached_user_policies(UserName=user.name)
resp["AttachedPolicies"].should.have.length_of(1)
attached_policy = resp["AttachedPolicies"][0]
attached_policy["PolicyArn"].should.equal(policy.arn)
attached_policy["PolicyName"].should.equal(policy_name)
client.detach_user_policy(UserName=user.name, PolicyArn=policy.arn)
resp = client.list_attached_user_policies(UserName=user.name)
resp["AttachedPolicies"].should.have.length_of(0)
@mock_iam
def test_update_access_key():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
with pytest.raises(ClientError):
client.update_access_key(
UserName=username, AccessKeyId="non-existent-key", Status="Inactive"
)
key = client.create_access_key(UserName=username)["AccessKey"]
client.update_access_key(
UserName=username, AccessKeyId=key["AccessKeyId"], Status="Inactive"
)
resp = client.list_access_keys(UserName=username)
resp["AccessKeyMetadata"][0]["Status"].should.equal("Inactive")
client.update_access_key(AccessKeyId=key["AccessKeyId"], Status="Active")
resp = client.list_access_keys(UserName=username)
resp["AccessKeyMetadata"][0]["Status"].should.equal("Active")
@mock_iam
def test_get_access_key_last_used_when_unused():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
with pytest.raises(ClientError):
client.get_access_key_last_used(AccessKeyId="non-existent-key-id")
create_key_response = client.create_access_key(UserName=username)["AccessKey"]
resp = client.get_access_key_last_used(
AccessKeyId=create_key_response["AccessKeyId"]
)
resp["AccessKeyLastUsed"].should_not.contain("LastUsedDate")
resp["UserName"].should.equal(create_key_response["UserName"])
@mock_iam
def test_upload_ssh_public_key():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
public_key = MOCK_CERT
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
pubkey = resp["SSHPublicKey"]
pubkey["SSHPublicKeyBody"].should.equal(public_key)
pubkey["UserName"].should.equal(username)
pubkey["SSHPublicKeyId"].should.have.length_of(20)
assert pubkey["SSHPublicKeyId"].startswith("APKA")
pubkey.should.have.key("Fingerprint")
pubkey["Status"].should.equal("Active")
(
datetime.utcnow() - pubkey["UploadDate"].replace(tzinfo=None)
).seconds.should.be.within(0, 10)
@mock_iam
def test_get_ssh_public_key():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
public_key = MOCK_CERT
with pytest.raises(ClientError):
client.get_ssh_public_key(
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Encoding="SSH",
)
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
resp = client.get_ssh_public_key(
UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
)
resp["SSHPublicKey"]["SSHPublicKeyBody"].should.equal(public_key)
@mock_iam
def test_list_ssh_public_keys():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
public_key = MOCK_CERT
resp = client.list_ssh_public_keys(UserName=username)
resp["SSHPublicKeys"].should.have.length_of(0)
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
resp = client.list_ssh_public_keys(UserName=username)
resp["SSHPublicKeys"].should.have.length_of(1)
resp["SSHPublicKeys"][0]["SSHPublicKeyId"].should.equal(ssh_public_key_id)
@mock_iam
def test_update_ssh_public_key():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
public_key = MOCK_CERT
with pytest.raises(ClientError):
client.update_ssh_public_key(
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Status="Inactive",
)
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
resp["SSHPublicKey"]["Status"].should.equal("Active")
resp = client.update_ssh_public_key(
UserName=username, SSHPublicKeyId=ssh_public_key_id, Status="Inactive"
)
resp = client.get_ssh_public_key(
UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
)
resp["SSHPublicKey"]["Status"].should.equal("Inactive")
@mock_iam
def test_delete_ssh_public_key():
iam = boto3.resource("iam", region_name="us-east-1")
client = iam.meta.client
username = "test-user"
iam.create_user(UserName=username)
public_key = MOCK_CERT
with pytest.raises(ClientError):
client.delete_ssh_public_key(
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx"
)
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
resp = client.list_ssh_public_keys(UserName=username)
resp["SSHPublicKeys"].should.have.length_of(1)
resp = client.delete_ssh_public_key(
UserName=username, SSHPublicKeyId=ssh_public_key_id
)
resp = client.list_ssh_public_keys(UserName=username)
resp["SSHPublicKeys"].should.have.length_of(0)
@mock_iam
def test_get_account_authorization_details():
test_policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{"Action": "s3:ListBucket", "Resource": "*", "Effect": "Allow"}
],
}
)
conn = boto3.client("iam", region_name="us-east-1")
boundary = "arn:aws:iam::{}:policy/boundary".format(ACCOUNT_ID)
conn.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="some policy",
Path="/my-path/",
Description="testing",
PermissionsBoundary=boundary,
)
conn.create_user(Path="/", UserName="testUser")
conn.create_group(Path="/", GroupName="testGroup")
conn.create_policy(
PolicyName="testPolicy",
Path="/",
PolicyDocument=test_policy,
Description="Test Policy",
)
# Attach things to the user and group:
conn.put_user_policy(
UserName="testUser", PolicyName="testPolicy", PolicyDocument=test_policy
)
conn.put_group_policy(
GroupName="testGroup", PolicyName="testPolicy", PolicyDocument=test_policy,
)
conn.attach_user_policy(
UserName="testUser",
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
)
conn.attach_group_policy(
GroupName="testGroup",
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
)
conn.add_user_to_group(UserName="testUser", GroupName="testGroup")
# Add things to the role:
conn.create_instance_profile(InstanceProfileName="ipn")
conn.add_role_to_instance_profile(InstanceProfileName="ipn", RoleName="my-role")
conn.tag_role(
RoleName="my-role",
Tags=[
{"Key": "somekey", "Value": "somevalue"},
{"Key": "someotherkey", "Value": "someothervalue"},
],
)
conn.put_role_policy(
RoleName="my-role", PolicyName="test-policy", PolicyDocument=test_policy
)
conn.attach_role_policy(
RoleName="my-role",
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
)
result = conn.get_account_authorization_details(Filter=["Role"])
assert len(result["RoleDetailList"]) == 1
assert len(result["UserDetailList"]) == 0
assert len(result["GroupDetailList"]) == 0
assert len(result["Policies"]) == 0
assert len(result["RoleDetailList"][0]["InstanceProfileList"]) == 1
assert (
result["RoleDetailList"][0]["InstanceProfileList"][0]["Roles"][0]["Description"]
== "testing"
)
assert result["RoleDetailList"][0]["InstanceProfileList"][0]["Roles"][0][
"PermissionsBoundary"
] == {
"PermissionsBoundaryType": "PermissionsBoundaryPolicy",
"PermissionsBoundaryArn": "arn:aws:iam::{}:policy/boundary".format(ACCOUNT_ID),
}
assert len(result["RoleDetailList"][0]["Tags"]) == 2
assert len(result["RoleDetailList"][0]["RolePolicyList"]) == 1
assert len(result["RoleDetailList"][0]["AttachedManagedPolicies"]) == 1
assert (
result["RoleDetailList"][0]["AttachedManagedPolicies"][0]["PolicyName"]
== "testPolicy"
)
assert result["RoleDetailList"][0]["AttachedManagedPolicies"][0][
"PolicyArn"
] == "arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID)
assert result["RoleDetailList"][0]["RolePolicyList"][0][
"PolicyDocument"
] == json.loads(test_policy)
result = conn.get_account_authorization_details(Filter=["User"])
assert len(result["RoleDetailList"]) == 0
assert len(result["UserDetailList"]) == 1
assert len(result["UserDetailList"][0]["GroupList"]) == 1
assert len(result["UserDetailList"][0]["UserPolicyList"]) == 1
assert len(result["UserDetailList"][0]["AttachedManagedPolicies"]) == 1
assert len(result["GroupDetailList"]) == 0
assert len(result["Policies"]) == 0
assert (
result["UserDetailList"][0]["AttachedManagedPolicies"][0]["PolicyName"]
== "testPolicy"
)
assert result["UserDetailList"][0]["AttachedManagedPolicies"][0][
"PolicyArn"
] == "arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID)
assert result["UserDetailList"][0]["UserPolicyList"][0][
"PolicyDocument"
] == json.loads(test_policy)
result = conn.get_account_authorization_details(Filter=["Group"])
assert len(result["RoleDetailList"]) == 0
assert len(result["UserDetailList"]) == 0
assert len(result["GroupDetailList"]) == 1
assert len(result["GroupDetailList"][0]["GroupPolicyList"]) == 1
assert len(result["GroupDetailList"][0]["AttachedManagedPolicies"]) == 1
assert len(result["Policies"]) == 0
assert (
result["GroupDetailList"][0]["AttachedManagedPolicies"][0]["PolicyName"]
== "testPolicy"
)
assert result["GroupDetailList"][0]["AttachedManagedPolicies"][0][
"PolicyArn"
] == "arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID)
assert result["GroupDetailList"][0]["GroupPolicyList"][0][
"PolicyDocument"
] == json.loads(test_policy)
result = conn.get_account_authorization_details(Filter=["LocalManagedPolicy"])
assert len(result["RoleDetailList"]) == 0
assert len(result["UserDetailList"]) == 0
assert len(result["GroupDetailList"]) == 0
assert len(result["Policies"]) == 1
assert len(result["Policies"][0]["PolicyVersionList"]) == 1
# Check for greater than 1 since this should always be greater than one but might change.
# See iam/aws_managed_policies.py
result = conn.get_account_authorization_details(Filter=["AWSManagedPolicy"])
assert len(result["RoleDetailList"]) == 0
assert len(result["UserDetailList"]) == 0
assert len(result["GroupDetailList"]) == 0
assert len(result["Policies"]) > 1
result = conn.get_account_authorization_details()
assert len(result["RoleDetailList"]) == 1
assert len(result["UserDetailList"]) == 1
assert len(result["GroupDetailList"]) == 1
assert len(result["Policies"]) > 1
@mock_iam
def test_signing_certs():
client = boto3.client("iam", region_name="us-east-1")
# Create the IAM user first:
client.create_user(UserName="testing")
# Upload the cert:
resp = client.upload_signing_certificate(
UserName="testing", CertificateBody=MOCK_CERT
)["Certificate"]
cert_id = resp["CertificateId"]
assert resp["UserName"] == "testing"
assert resp["Status"] == "Active"
assert resp["CertificateBody"] == MOCK_CERT
assert resp["CertificateId"]
# Upload a the cert with an invalid body:
with pytest.raises(ClientError) as ce:
client.upload_signing_certificate(
UserName="testing", CertificateBody="notacert"
)
assert ce.value.response["Error"]["Code"] == "MalformedCertificate"
# Upload with an invalid user:
with pytest.raises(ClientError):
client.upload_signing_certificate(
UserName="notauser", CertificateBody=MOCK_CERT
)
# Update:
client.update_signing_certificate(
UserName="testing", CertificateId=cert_id, Status="Inactive"
)
with pytest.raises(ClientError):
client.update_signing_certificate(
UserName="notauser", CertificateId=cert_id, Status="Inactive"
)
with pytest.raises(ClientError) as ce:
client.update_signing_certificate(
UserName="testing", CertificateId="x" * 32, Status="Inactive"
)
assert ce.value.response["Error"][
"Message"
] == "The Certificate with id {id} cannot be found.".format(id="x" * 32)
# List the certs:
resp = client.list_signing_certificates(UserName="testing")["Certificates"]
assert len(resp) == 1
assert resp[0]["CertificateBody"] == MOCK_CERT
assert resp[0]["Status"] == "Inactive" # Changed with the update call above.
with pytest.raises(ClientError):
client.list_signing_certificates(UserName="notauser")
# Delete:
client.delete_signing_certificate(UserName="testing", CertificateId=cert_id)
with pytest.raises(ClientError):
client.delete_signing_certificate(UserName="notauser", CertificateId=cert_id)
@mock_iam()
def test_create_saml_provider():
conn = boto3.client("iam", region_name="us-east-1")
response = conn.create_saml_provider(
Name="TestSAMLProvider", SAMLMetadataDocument="a" * 1024
)
response["SAMLProviderArn"].should.equal(
"arn:aws:iam::{}:saml-provider/TestSAMLProvider".format(ACCOUNT_ID)
)
@mock_iam()
def test_get_saml_provider():
conn = boto3.client("iam", region_name="us-east-1")
saml_provider_create = conn.create_saml_provider(
Name="TestSAMLProvider", SAMLMetadataDocument="a" * 1024
)
response = conn.get_saml_provider(
SAMLProviderArn=saml_provider_create["SAMLProviderArn"]
)
response["SAMLMetadataDocument"].should.equal("a" * 1024)
@mock_iam()
def test_list_saml_providers():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_saml_provider(Name="TestSAMLProvider", SAMLMetadataDocument="a" * 1024)
response = conn.list_saml_providers()
response["SAMLProviderList"][0]["Arn"].should.equal(
"arn:aws:iam::{}:saml-provider/TestSAMLProvider".format(ACCOUNT_ID)
)
@mock_iam()
def test_delete_saml_provider():
conn = boto3.client("iam", region_name="us-east-1")
saml_provider_create = conn.create_saml_provider(
Name="TestSAMLProvider", SAMLMetadataDocument="a" * 1024
)
response = conn.list_saml_providers()
len(response["SAMLProviderList"]).should.equal(1)
conn.delete_saml_provider(SAMLProviderArn=saml_provider_create["SAMLProviderArn"])
response = conn.list_saml_providers()
len(response["SAMLProviderList"]).should.equal(0)
conn.create_user(UserName="testing")
cert_id = "123456789012345678901234"
with pytest.raises(ClientError) as ce:
conn.delete_signing_certificate(UserName="testing", CertificateId=cert_id)
assert ce.value.response["Error"][
"Message"
] == "The Certificate with id {id} cannot be found.".format(id=cert_id)
# Verify that it's not in the list:
resp = conn.list_signing_certificates(UserName="testing")
assert not resp["Certificates"]
@mock_iam()
def test_create_role_defaults():
"""Tests default values"""
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(RoleName="my-role", AssumeRolePolicyDocument="{}")
# Get role:
role = conn.get_role(RoleName="my-role")["Role"]
assert role["MaxSessionDuration"] == 3600
assert role.get("Description") is None
@mock_iam()
def test_create_role_with_tags():
"""Tests both the tag_role and get_role_tags capability"""
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="{}",
Tags=[
{"Key": "somekey", "Value": "somevalue"},
{"Key": "someotherkey", "Value": "someothervalue"},
],
Description="testing",
)
# Get role:
role = conn.get_role(RoleName="my-role")["Role"]
assert len(role["Tags"]) == 2
assert role["Tags"][0]["Key"] == "somekey"
assert role["Tags"][0]["Value"] == "somevalue"
assert role["Tags"][1]["Key"] == "someotherkey"
assert role["Tags"][1]["Value"] == "someothervalue"
assert role["Description"] == "testing"
# Empty is good:
conn.create_role(
RoleName="my-role2",
AssumeRolePolicyDocument="{}",
Tags=[{"Key": "somekey", "Value": ""}],
)
tags = conn.list_role_tags(RoleName="my-role2")
assert len(tags["Tags"]) == 1
assert tags["Tags"][0]["Key"] == "somekey"
assert tags["Tags"][0]["Value"] == ""
# Test creating tags with invalid values:
# With more than 50 tags:
with pytest.raises(ClientError) as ce:
too_many_tags = list(
map(lambda x: {"Key": str(x), "Value": str(x)}, range(0, 51))
)
conn.create_role(
RoleName="my-role3", AssumeRolePolicyDocument="{}", Tags=too_many_tags,
)
assert (
"failed to satisfy constraint: Member must have length less than or equal to 50."
in ce.value.response["Error"]["Message"]
)
# With a duplicate tag:
with pytest.raises(ClientError) as ce:
conn.create_role(
RoleName="my-role3",
AssumeRolePolicyDocument="{}",
Tags=[{"Key": "0", "Value": ""}, {"Key": "0", "Value": ""}],
)
assert (
"Duplicate tag keys found. Please note that Tag keys are case insensitive."
in ce.value.response["Error"]["Message"]
)
# Duplicate tag with different casing:
with pytest.raises(ClientError) as ce:
conn.create_role(
RoleName="my-role3",
AssumeRolePolicyDocument="{}",
Tags=[{"Key": "a", "Value": ""}, {"Key": "A", "Value": ""}],
)
assert (
"Duplicate tag keys found. Please note that Tag keys are case insensitive."
in ce.value.response["Error"]["Message"]
)
# With a really big key:
with pytest.raises(ClientError) as ce:
conn.create_role(
RoleName="my-role3",
AssumeRolePolicyDocument="{}",
Tags=[{"Key": "0" * 129, "Value": ""}],
)
assert (
"Member must have length less than or equal to 128."
in ce.value.response["Error"]["Message"]
)
# With a really big value:
with pytest.raises(ClientError) as ce:
conn.create_role(
RoleName="my-role3",
AssumeRolePolicyDocument="{}",
Tags=[{"Key": "0", "Value": "0" * 257}],
)
assert (
"Member must have length less than or equal to 256."
in ce.value.response["Error"]["Message"]
)
# With an invalid character:
with pytest.raises(ClientError) as ce:
conn.create_role(
RoleName="my-role3",
AssumeRolePolicyDocument="{}",
Tags=[{"Key": "NOWAY!", "Value": ""}],
)
assert (
"Member must satisfy regular expression pattern: [\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]+"
in ce.value.response["Error"]["Message"]
)
@mock_iam()
def test_tag_role():
"""Tests both the tag_role and get_role_tags capability"""
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(RoleName="my-role", AssumeRolePolicyDocument="{}")
# Get without tags:
role = conn.get_role(RoleName="my-role")["Role"]
assert not role.get("Tags")
# With proper tag values:
conn.tag_role(
RoleName="my-role",
Tags=[
{"Key": "somekey", "Value": "somevalue"},
{"Key": "someotherkey", "Value": "someothervalue"},
],
)
# Get role:
role = conn.get_role(RoleName="my-role")["Role"]
assert len(role["Tags"]) == 2
assert role["Tags"][0]["Key"] == "somekey"
assert role["Tags"][0]["Value"] == "somevalue"
assert role["Tags"][1]["Key"] == "someotherkey"
assert role["Tags"][1]["Value"] == "someothervalue"
# Same -- but for list_role_tags:
tags = conn.list_role_tags(RoleName="my-role")
assert len(tags["Tags"]) == 2
assert role["Tags"][0]["Key"] == "somekey"
assert role["Tags"][0]["Value"] == "somevalue"
assert role["Tags"][1]["Key"] == "someotherkey"
assert role["Tags"][1]["Value"] == "someothervalue"
assert not tags["IsTruncated"]
assert not tags.get("Marker")
# Test pagination:
tags = conn.list_role_tags(RoleName="my-role", MaxItems=1)
assert len(tags["Tags"]) == 1
assert tags["IsTruncated"]
assert tags["Tags"][0]["Key"] == "somekey"
assert tags["Tags"][0]["Value"] == "somevalue"
assert tags["Marker"] == "1"
tags = conn.list_role_tags(RoleName="my-role", Marker=tags["Marker"])
assert len(tags["Tags"]) == 1
assert tags["Tags"][0]["Key"] == "someotherkey"
assert tags["Tags"][0]["Value"] == "someothervalue"
assert not tags["IsTruncated"]
assert not tags.get("Marker")
# Test updating an existing tag:
conn.tag_role(
RoleName="my-role", Tags=[{"Key": "somekey", "Value": "somenewvalue"}]
)
tags = conn.list_role_tags(RoleName="my-role")
assert len(tags["Tags"]) == 2
assert tags["Tags"][0]["Key"] == "somekey"
assert tags["Tags"][0]["Value"] == "somenewvalue"
# Empty is good:
conn.tag_role(RoleName="my-role", Tags=[{"Key": "somekey", "Value": ""}])
tags = conn.list_role_tags(RoleName="my-role")
assert len(tags["Tags"]) == 2
assert tags["Tags"][0]["Key"] == "somekey"
assert tags["Tags"][0]["Value"] == ""
# Test creating tags with invalid values:
# With more than 50 tags:
with pytest.raises(ClientError) as ce:
too_many_tags = list(
map(lambda x: {"Key": str(x), "Value": str(x)}, range(0, 51))
)
conn.tag_role(RoleName="my-role", Tags=too_many_tags)
assert (
"failed to satisfy constraint: Member must have length less than or equal to 50."
in ce.value.response["Error"]["Message"]
)
# With a duplicate tag:
with pytest.raises(ClientError) as ce:
conn.tag_role(
RoleName="my-role",
Tags=[{"Key": "0", "Value": ""}, {"Key": "0", "Value": ""}],
)
assert (
"Duplicate tag keys found. Please note that Tag keys are case insensitive."
in ce.value.response["Error"]["Message"]
)
# Duplicate tag with different casing:
with pytest.raises(ClientError) as ce:
conn.tag_role(
RoleName="my-role",
Tags=[{"Key": "a", "Value": ""}, {"Key": "A", "Value": ""}],
)
assert (
"Duplicate tag keys found. Please note that Tag keys are case insensitive."
in ce.value.response["Error"]["Message"]
)
# With a really big key:
with pytest.raises(ClientError) as ce:
conn.tag_role(RoleName="my-role", Tags=[{"Key": "0" * 129, "Value": ""}])
assert (
"Member must have length less than or equal to 128."
in ce.value.response["Error"]["Message"]
)
# With a really big value:
with pytest.raises(ClientError) as ce:
conn.tag_role(RoleName="my-role", Tags=[{"Key": "0", "Value": "0" * 257}])
assert (
"Member must have length less than or equal to 256."
in ce.value.response["Error"]["Message"]
)
# With an invalid character:
with pytest.raises(ClientError) as ce:
conn.tag_role(RoleName="my-role", Tags=[{"Key": "NOWAY!", "Value": ""}])
assert (
"Member must satisfy regular expression pattern: [\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]+"
in ce.value.response["Error"]["Message"]
)
# With a role that doesn't exist:
with pytest.raises(ClientError):
conn.tag_role(RoleName="notarole", Tags=[{"Key": "some", "Value": "value"}])
@mock_iam
def test_untag_role():
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(RoleName="my-role", AssumeRolePolicyDocument="{}")
# With proper tag values:
conn.tag_role(
RoleName="my-role",
Tags=[
{"Key": "somekey", "Value": "somevalue"},
{"Key": "someotherkey", "Value": "someothervalue"},
],
)
# Remove them:
conn.untag_role(RoleName="my-role", TagKeys=["somekey"])
tags = conn.list_role_tags(RoleName="my-role")
assert len(tags["Tags"]) == 1
assert tags["Tags"][0]["Key"] == "someotherkey"
assert tags["Tags"][0]["Value"] == "someothervalue"
# And again:
conn.untag_role(RoleName="my-role", TagKeys=["someotherkey"])
tags = conn.list_role_tags(RoleName="my-role")
assert not tags["Tags"]
# Test removing tags with invalid values:
# With more than 50 tags:
with pytest.raises(ClientError) as ce:
conn.untag_role(RoleName="my-role", TagKeys=[str(x) for x in range(0, 51)])
assert (
"failed to satisfy constraint: Member must have length less than or equal to 50."
in ce.value.response["Error"]["Message"]
)
assert "tagKeys" in ce.value.response["Error"]["Message"]
# With a really big key:
with pytest.raises(ClientError) as ce:
conn.untag_role(RoleName="my-role", TagKeys=["0" * 129])
assert (
"Member must have length less than or equal to 128."
in ce.value.response["Error"]["Message"]
)
assert "tagKeys" in ce.value.response["Error"]["Message"]
# With an invalid character:
with pytest.raises(ClientError) as ce:
conn.untag_role(RoleName="my-role", TagKeys=["NOWAY!"])
assert (
"Member must satisfy regular expression pattern: [\\p{L}\\p{Z}\\p{N}_.:/=+\\-@]+"
in ce.value.response["Error"]["Message"]
)
assert "tagKeys" in ce.value.response["Error"]["Message"]
# With a role that doesn't exist:
with pytest.raises(ClientError):
conn.untag_role(RoleName="notarole", TagKeys=["somevalue"])
@mock_iam()
def test_update_role_description():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
conn.delete_role(RoleName="my-role")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
response = conn.update_role_description(RoleName="my-role", Description="test")
assert response["Role"]["RoleName"] == "my-role"
@mock_iam()
def test_update_role():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
conn.delete_role(RoleName="my-role")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
response = conn.update_role_description(RoleName="my-role", Description="test")
assert response["Role"]["RoleName"] == "my-role"
@mock_iam()
def test_update_role():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
conn.delete_role(RoleName="my-role")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
response = conn.update_role(RoleName="my-role", Description="test")
assert len(response.keys()) == 1
@mock_iam()
def test_update_role_defaults():
conn = boto3.client("iam", region_name="us-east-1")
with pytest.raises(ClientError):
conn.delete_role(RoleName="my-role")
conn.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="some policy",
Description="test",
Path="/my-path/",
)
response = conn.update_role(RoleName="my-role")
assert len(response.keys()) == 1
role = conn.get_role(RoleName="my-role")["Role"]
assert role["MaxSessionDuration"] == 3600
assert role.get("Description") is None
@mock_iam()
def test_list_entities_for_policy():
test_policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{"Action": "s3:ListBucket", "Resource": "*", "Effect": "Allow"}
],
}
)
conn = boto3.client("iam", region_name="us-east-1")
conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Path="/my-path/",
)
conn.create_user(Path="/", UserName="testUser")
conn.create_group(Path="/", GroupName="testGroup")
conn.create_policy(
PolicyName="testPolicy",
Path="/",
PolicyDocument=test_policy,
Description="Test Policy",
)
# Attach things to the user and group:
conn.put_user_policy(
UserName="testUser", PolicyName="testPolicy", PolicyDocument=test_policy
)
conn.put_group_policy(
GroupName="testGroup", PolicyName="testPolicy", PolicyDocument=test_policy,
)
conn.attach_user_policy(
UserName="testUser",
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
)
conn.attach_group_policy(
GroupName="testGroup",
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
)
conn.add_user_to_group(UserName="testUser", GroupName="testGroup")
# Add things to the role:
conn.create_instance_profile(InstanceProfileName="ipn")
conn.add_role_to_instance_profile(InstanceProfileName="ipn", RoleName="my-role")
conn.tag_role(
RoleName="my-role",
Tags=[
{"Key": "somekey", "Value": "somevalue"},
{"Key": "someotherkey", "Value": "someothervalue"},
],
)
conn.put_role_policy(
RoleName="my-role", PolicyName="test-policy", PolicyDocument=test_policy
)
conn.attach_role_policy(
RoleName="my-role",
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
)
response = conn.list_entities_for_policy(
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
EntityFilter="Role",
)
assert response["PolicyRoles"] == [{"RoleName": "my-role"}]
response = conn.list_entities_for_policy(
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
EntityFilter="User",
)
assert response["PolicyUsers"] == [{"UserName": "testUser"}]
response = conn.list_entities_for_policy(
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
EntityFilter="Group",
)
assert response["PolicyGroups"] == [{"GroupName": "testGroup"}]
response = conn.list_entities_for_policy(
PolicyArn="arn:aws:iam::{}:policy/testPolicy".format(ACCOUNT_ID),
EntityFilter="LocalManagedPolicy",
)
assert response["PolicyGroups"] == [{"GroupName": "testGroup"}]
assert response["PolicyUsers"] == [{"UserName": "testUser"}]
assert response["PolicyRoles"] == [{"RoleName": "my-role"}]
@mock_iam()
def test_create_role_no_path():
conn = boto3.client("iam", region_name="us-east-1")
resp = conn.create_role(
RoleName="my-role", AssumeRolePolicyDocument="some policy", Description="test",
)
resp.get("Role").get("Arn").should.equal(
"arn:aws:iam::{}:role/my-role".format(ACCOUNT_ID)
)
resp.get("Role").should_not.have.key("PermissionsBoundary")
resp.get("Role").get("Description").should.equal("test")
@mock_iam()
def test_create_role_with_permissions_boundary():
conn = boto3.client("iam", region_name="us-east-1")
boundary = "arn:aws:iam::{}:policy/boundary".format(ACCOUNT_ID)
resp = conn.create_role(
RoleName="my-role",
AssumeRolePolicyDocument="some policy",
Description="test",
PermissionsBoundary=boundary,
)
expected = {
"PermissionsBoundaryType": "PermissionsBoundaryPolicy",
"PermissionsBoundaryArn": boundary,
}
resp.get("Role").get("PermissionsBoundary").should.equal(expected)
resp.get("Role").get("Description").should.equal("test")
conn.delete_role_permissions_boundary(RoleName="my-role")
conn.list_roles().get("Roles")[0].should_not.have.key("PermissionsBoundary")
conn.put_role_permissions_boundary(RoleName="my-role", PermissionsBoundary=boundary)
resp.get("Role").get("PermissionsBoundary").should.equal(expected)
invalid_boundary_arn = "arn:aws:iam::123456789:not_a_boundary"
with pytest.raises(ClientError):
conn.put_role_permissions_boundary(
RoleName="my-role", PermissionsBoundary=invalid_boundary_arn
)
with pytest.raises(ClientError):
conn.create_role(
RoleName="bad-boundary",
AssumeRolePolicyDocument="some policy",
Description="test",
PermissionsBoundary=invalid_boundary_arn,
)
# Ensure the PermissionsBoundary is included in role listing as well
conn.list_roles().get("Roles")[0].get("PermissionsBoundary").should.equal(expected)
@mock_iam
def test_create_role_with_same_name_should_fail():
iam = boto3.client("iam", region_name="us-east-1")
test_role_name = str(uuid4())
iam.create_role(
RoleName=test_role_name, AssumeRolePolicyDocument="policy", Description="test",
)
# Create the role again, and verify that it fails
with pytest.raises(ClientError) as err:
iam.create_role(
RoleName=test_role_name,
AssumeRolePolicyDocument="policy",
Description="test",
)
err.value.response["Error"]["Code"].should.equal("EntityAlreadyExists")
err.value.response["Error"]["Message"].should.equal(
"Role with name {0} already exists.".format(test_role_name)
)
@mock_iam
def test_create_policy_with_same_name_should_fail():
iam = boto3.client("iam", region_name="us-east-1")
test_policy_name = str(uuid4())
policy = iam.create_policy(PolicyName=test_policy_name, PolicyDocument=MOCK_POLICY)
# Create the role again, and verify that it fails
with pytest.raises(ClientError) as err:
iam.create_policy(PolicyName=test_policy_name, PolicyDocument=MOCK_POLICY)
err.value.response["Error"]["Code"].should.equal("EntityAlreadyExists")
err.value.response["Error"]["Message"].should.equal(
"A policy called {0} already exists. Duplicate names are not allowed.".format(
test_policy_name
)
)
@mock_iam
def test_create_open_id_connect_provider():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_open_id_connect_provider(
Url="https://example.com",
ThumbprintList=[], # even it is required to provide at least one thumbprint, AWS accepts an empty list
)
response["OpenIDConnectProviderArn"].should.equal(
"arn:aws:iam::{}:oidc-provider/example.com".format(ACCOUNT_ID)
)
response = client.create_open_id_connect_provider(
Url="http://example.org", ThumbprintList=["b" * 40], ClientIDList=["b"]
)
response["OpenIDConnectProviderArn"].should.equal(
"arn:aws:iam::{}:oidc-provider/example.org".format(ACCOUNT_ID)
)
response = client.create_open_id_connect_provider(
Url="http://example.org/oidc", ThumbprintList=[]
)
response["OpenIDConnectProviderArn"].should.equal(
"arn:aws:iam::{}:oidc-provider/example.org/oidc".format(ACCOUNT_ID)
)
response = client.create_open_id_connect_provider(
Url="http://example.org/oidc-query?test=true", ThumbprintList=[]
)
response["OpenIDConnectProviderArn"].should.equal(
"arn:aws:iam::{}:oidc-provider/example.org/oidc-query".format(ACCOUNT_ID)
)
@mock_iam
def test_create_open_id_connect_provider_errors():
client = boto3.client("iam", region_name="us-east-1")
client.create_open_id_connect_provider(Url="https://example.com", ThumbprintList=[])
client.create_open_id_connect_provider.when.called_with(
Url="https://example.com", ThumbprintList=[]
).should.throw(ClientError, "Unknown")
client.create_open_id_connect_provider.when.called_with(
Url="example.org", ThumbprintList=[]
).should.throw(ClientError, "Invalid Open ID Connect Provider URL")
client.create_open_id_connect_provider.when.called_with(
Url="example", ThumbprintList=[]
).should.throw(ClientError, "Invalid Open ID Connect Provider URL")
client.create_open_id_connect_provider.when.called_with(
Url="http://example.org",
ThumbprintList=["a" * 40, "b" * 40, "c" * 40, "d" * 40, "e" * 40, "f" * 40,],
).should.throw(ClientError, "Thumbprint list must contain fewer than 5 entries.")
too_many_client_ids = ["{}".format(i) for i in range(101)]
client.create_open_id_connect_provider.when.called_with(
Url="http://example.org", ThumbprintList=[], ClientIDList=too_many_client_ids,
).should.throw(
ClientError, "Cannot exceed quota for ClientIdsPerOpenIdConnectProvider: 100",
)
too_long_url = "b" * 256
too_long_thumbprint = "b" * 41
too_long_client_id = "b" * 256
client.create_open_id_connect_provider.when.called_with(
Url=too_long_url,
ThumbprintList=[too_long_thumbprint],
ClientIDList=[too_long_client_id],
).should.throw(
ClientError,
"3 validation errors detected: "
'Value "{0}" at "clientIDList" failed to satisfy constraint: '
"Member must satisfy constraint: "
"[Member must have length less than or equal to 255, "
"Member must have length greater than or equal to 1]; "
'Value "{1}" at "thumbprintList" failed to satisfy constraint: '
"Member must satisfy constraint: "
"[Member must have length less than or equal to 40, "
"Member must have length greater than or equal to 40]; "
'Value "{2}" at "url" failed to satisfy constraint: '
"Member must have length less than or equal to 255".format(
[too_long_client_id], [too_long_thumbprint], too_long_url
),
)
@mock_iam
def test_delete_open_id_connect_provider():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_open_id_connect_provider(
Url="https://example.com", ThumbprintList=[]
)
open_id_arn = response["OpenIDConnectProviderArn"]
client.delete_open_id_connect_provider(OpenIDConnectProviderArn=open_id_arn)
client.get_open_id_connect_provider.when.called_with(
OpenIDConnectProviderArn=open_id_arn
).should.throw(
ClientError, "OpenIDConnect Provider not found for arn {}".format(open_id_arn),
)
# deleting a non existing provider should be successful
client.delete_open_id_connect_provider(OpenIDConnectProviderArn=open_id_arn)
@mock_iam
def test_get_open_id_connect_provider():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_open_id_connect_provider(
Url="https://example.com", ThumbprintList=["b" * 40], ClientIDList=["b"]
)
open_id_arn = response["OpenIDConnectProviderArn"]
response = client.get_open_id_connect_provider(OpenIDConnectProviderArn=open_id_arn)
response["Url"].should.equal("example.com")
response["ThumbprintList"].should.equal(["b" * 40])
response["ClientIDList"].should.equal(["b"])
response.should.have.key("CreateDate").should.be.a(datetime)
@mock_iam
def test_get_open_id_connect_provider_errors():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_open_id_connect_provider(
Url="https://example.com", ThumbprintList=["b" * 40], ClientIDList=["b"]
)
open_id_arn = response["OpenIDConnectProviderArn"]
client.get_open_id_connect_provider.when.called_with(
OpenIDConnectProviderArn=open_id_arn + "-not-existing"
).should.throw(
ClientError,
"OpenIDConnect Provider not found for arn {}".format(
open_id_arn + "-not-existing"
),
)
@mock_iam
def test_list_open_id_connect_providers():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_open_id_connect_provider(
Url="https://example.com", ThumbprintList=[]
)
open_id_arn_1 = response["OpenIDConnectProviderArn"]
response = client.create_open_id_connect_provider(
Url="http://example.org", ThumbprintList=["b" * 40], ClientIDList=["b"]
)
open_id_arn_2 = response["OpenIDConnectProviderArn"]
response = client.create_open_id_connect_provider(
Url="http://example.org/oidc", ThumbprintList=[]
)
open_id_arn_3 = response["OpenIDConnectProviderArn"]
response = client.list_open_id_connect_providers()
sorted(response["OpenIDConnectProviderList"], key=lambda i: i["Arn"]).should.equal(
[{"Arn": open_id_arn_1}, {"Arn": open_id_arn_2}, {"Arn": open_id_arn_3}]
)
@mock_iam
def test_update_account_password_policy():
client = boto3.client("iam", region_name="us-east-1")
client.update_account_password_policy()
response = client.get_account_password_policy()
response["PasswordPolicy"].should.equal(
{
"AllowUsersToChangePassword": False,
"ExpirePasswords": False,
"MinimumPasswordLength": 6,
"RequireLowercaseCharacters": False,
"RequireNumbers": False,
"RequireSymbols": False,
"RequireUppercaseCharacters": False,
"HardExpiry": False,
}
)
@mock_iam
def test_update_account_password_policy_errors():
client = boto3.client("iam", region_name="us-east-1")
client.update_account_password_policy.when.called_with(
MaxPasswordAge=1096, MinimumPasswordLength=129, PasswordReusePrevention=25,
).should.throw(
ClientError,
"3 validation errors detected: "
'Value "129" at "minimumPasswordLength" failed to satisfy constraint: '
"Member must have value less than or equal to 128; "
'Value "25" at "passwordReusePrevention" failed to satisfy constraint: '
"Member must have value less than or equal to 24; "
'Value "1096" at "maxPasswordAge" failed to satisfy constraint: '
"Member must have value less than or equal to 1095",
)
@mock_iam
def test_get_account_password_policy():
client = boto3.client("iam", region_name="us-east-1")
client.update_account_password_policy(
AllowUsersToChangePassword=True,
HardExpiry=True,
MaxPasswordAge=60,
MinimumPasswordLength=10,
PasswordReusePrevention=3,
RequireLowercaseCharacters=True,
RequireNumbers=True,
RequireSymbols=True,
RequireUppercaseCharacters=True,
)
response = client.get_account_password_policy()
response["PasswordPolicy"].should.equal(
{
"AllowUsersToChangePassword": True,
"ExpirePasswords": True,
"HardExpiry": True,
"MaxPasswordAge": 60,
"MinimumPasswordLength": 10,
"PasswordReusePrevention": 3,
"RequireLowercaseCharacters": True,
"RequireNumbers": True,
"RequireSymbols": True,
"RequireUppercaseCharacters": True,
}
)
@mock_iam
def test_get_account_password_policy_errors():
client = boto3.client("iam", region_name="us-east-1")
client.get_account_password_policy.when.called_with().should.throw(
ClientError,
"The Password Policy with domain name {} cannot be found.".format(ACCOUNT_ID),
)
@mock_iam
def test_delete_account_password_policy():
client = boto3.client("iam", region_name="us-east-1")
client.update_account_password_policy()
response = client.get_account_password_policy()
response.should.have.key("PasswordPolicy").which.should.be.a(dict)
client.delete_account_password_policy()
client.get_account_password_policy.when.called_with().should.throw(
ClientError,
"The Password Policy with domain name {} cannot be found.".format(ACCOUNT_ID),
)
@mock_iam
def test_delete_account_password_policy_errors():
client = boto3.client("iam", region_name="us-east-1")
client.delete_account_password_policy.when.called_with().should.throw(
ClientError, "The account policy with name PasswordPolicy cannot be found.",
)
@mock_iam
def test_get_account_summary():
client = boto3.client("iam", region_name="us-east-1")
iam = boto3.resource("iam", region_name="us-east-1")
account_summary = iam.AccountSummary()
account_summary.summary_map.should.equal(
{
"GroupPolicySizeQuota": 5120,
"InstanceProfilesQuota": 1000,
"Policies": 0,
"GroupsPerUserQuota": 10,
"InstanceProfiles": 0,
"AttachedPoliciesPerUserQuota": 10,
"Users": 0,
"PoliciesQuota": 1500,
"Providers": 0,
"AccountMFAEnabled": 0,
"AccessKeysPerUserQuota": 2,
"AssumeRolePolicySizeQuota": 2048,
"PolicyVersionsInUseQuota": 10000,
"GlobalEndpointTokenVersion": 1,
"VersionsPerPolicyQuota": 5,
"AttachedPoliciesPerGroupQuota": 10,
"PolicySizeQuota": 6144,
"Groups": 0,
"AccountSigningCertificatesPresent": 0,
"UsersQuota": 5000,
"ServerCertificatesQuota": 20,
"MFADevices": 0,
"UserPolicySizeQuota": 2048,
"PolicyVersionsInUse": 0,
"ServerCertificates": 0,
"Roles": 0,
"RolesQuota": 1000,
"SigningCertificatesPerUserQuota": 2,
"MFADevicesInUse": 0,
"RolePolicySizeQuota": 10240,
"AttachedPoliciesPerRoleQuota": 10,
"AccountAccessKeysPresent": 0,
"GroupsQuota": 300,
}
)
client.create_instance_profile(InstanceProfileName="test-profile")
client.create_open_id_connect_provider(Url="https://example.com", ThumbprintList=[])
response_policy = client.create_policy(
PolicyName="test-policy", PolicyDocument=MOCK_POLICY
)
client.create_role(RoleName="test-role", AssumeRolePolicyDocument="test policy")
client.attach_role_policy(
RoleName="test-role", PolicyArn=response_policy["Policy"]["Arn"]
)
client.create_saml_provider(
Name="TestSAMLProvider", SAMLMetadataDocument="a" * 1024
)
client.create_group(GroupName="test-group")
client.attach_group_policy(
GroupName="test-group", PolicyArn=response_policy["Policy"]["Arn"]
)
client.create_user(UserName="test-user")
client.attach_user_policy(
UserName="test-user", PolicyArn=response_policy["Policy"]["Arn"]
)
client.enable_mfa_device(
UserName="test-user",
SerialNumber="123456789",
AuthenticationCode1="234567",
AuthenticationCode2="987654",
)
client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
client.upload_server_certificate(
ServerCertificateName="test-cert",
CertificateBody="cert-body",
PrivateKey="private-key",
)
account_summary.load()
account_summary.summary_map.should.equal(
{
"GroupPolicySizeQuota": 5120,
"InstanceProfilesQuota": 1000,
"Policies": 1,
"GroupsPerUserQuota": 10,
"InstanceProfiles": 1,
"AttachedPoliciesPerUserQuota": 10,
"Users": 1,
"PoliciesQuota": 1500,
"Providers": 2,
"AccountMFAEnabled": 0,
"AccessKeysPerUserQuota": 2,
"AssumeRolePolicySizeQuota": 2048,
"PolicyVersionsInUseQuota": 10000,
"GlobalEndpointTokenVersion": 1,
"VersionsPerPolicyQuota": 5,
"AttachedPoliciesPerGroupQuota": 10,
"PolicySizeQuota": 6144,
"Groups": 1,
"AccountSigningCertificatesPresent": 0,
"UsersQuota": 5000,
"ServerCertificatesQuota": 20,
"MFADevices": 1,
"UserPolicySizeQuota": 2048,
"PolicyVersionsInUse": 3,
"ServerCertificates": 1,
"Roles": 1,
"RolesQuota": 1000,
"SigningCertificatesPerUserQuota": 2,
"MFADevicesInUse": 1,
"RolePolicySizeQuota": 10240,
"AttachedPoliciesPerRoleQuota": 10,
"AccountAccessKeysPresent": 0,
"GroupsQuota": 300,
}
)
@mock_iam()
def test_list_user_tags():
"""Tests both setting a tags on a user in create_user and list_user_tags"""
conn = boto3.client("iam", region_name="us-east-1")
conn.create_user(UserName="kenny-bania")
conn.create_user(
UserName="jackie-chiles", Tags=[{"Key": "Sue-Allen", "Value": "Oh-Henry"}],
)
conn.create_user(
UserName="cosmo",
Tags=[
{"Key": "Stan", "Value": "The Caddy"},
{"Key": "like-a", "Value": "glove"},
],
)
response = conn.list_user_tags(UserName="kenny-bania")
response["Tags"].should.equal([])
response["IsTruncated"].should_not.be.ok
response = conn.list_user_tags(UserName="jackie-chiles")
response["Tags"].should.equal([{"Key": "Sue-Allen", "Value": "Oh-Henry"}])
response["IsTruncated"].should_not.be.ok
response = conn.list_user_tags(UserName="cosmo")
response["Tags"].should.equal(
[{"Key": "Stan", "Value": "The Caddy"}, {"Key": "like-a", "Value": "glove"},]
)
response["IsTruncated"].should_not.be.ok
@mock_iam()
def test_delete_role_with_instance_profiles_present():
iam = boto3.client("iam", region_name="us-east-1")
trust_policy = """
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
"""
trust_policy = trust_policy.strip()
iam.create_role(RoleName="Role1", AssumeRolePolicyDocument=trust_policy)
iam.create_instance_profile(InstanceProfileName="IP1")
iam.add_role_to_instance_profile(InstanceProfileName="IP1", RoleName="Role1")
iam.create_role(RoleName="Role2", AssumeRolePolicyDocument=trust_policy)
iam.delete_role(RoleName="Role2")
role_names = [role["RoleName"] for role in iam.list_roles()["Roles"]]
assert "Role1" in role_names
assert "Role2" not in role_names
@mock_iam
def test_delete_account_password_policy_errors():
client = boto3.client("iam", region_name="us-east-1")
client.delete_account_password_policy.when.called_with().should.throw(
ClientError, "The account policy with name PasswordPolicy cannot be found.",
)
@mock_iam
def test_role_list_config_discovered_resources():
from moto.iam.config import role_config_query
from moto.iam.utils import random_resource_id
# Without any roles
assert role_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
# Make 3 roles
roles = []
num_roles = 3
for ix in range(1, num_roles + 1):
this_role = role_config_query.backends["global"].create_role(
role_name="role{}".format(ix),
assume_role_policy_document=None,
path="/",
permissions_boundary=None,
description="role{}".format(ix),
tags=[{"Key": "foo", "Value": "bar"}],
max_session_duration=3600,
)
roles.append(
{"id": this_role.id, "name": this_role.name,}
)
assert len(roles) == num_roles
result = role_config_query.list_config_service_resources(None, None, 100, None)[0]
assert len(result) == num_roles
# The roles gets a random ID, so we can't directly test it
role = result[0]
assert role["type"] == "AWS::IAM::Role"
assert role["id"] in list(map(lambda p: p["id"], roles))
assert role["name"] in list(map(lambda p: p["name"], roles))
assert role["region"] == "global"
# test passing list of resource ids
resource_ids = role_config_query.list_config_service_resources(
[roles[0]["id"], roles[1]["id"]], None, 100, None
)[0]
assert len(resource_ids) == 2
# test passing a single resource name
resource_name = role_config_query.list_config_service_resources(
None, roles[0]["name"], 100, None
)[0]
assert len(resource_name) == 1
assert resource_name[0]["id"] == roles[0]["id"]
assert resource_name[0]["name"] == roles[0]["name"]
# test passing a single resource name AND some resource id's
both_filter_good = role_config_query.list_config_service_resources(
[roles[0]["id"], roles[1]["id"]], roles[0]["name"], 100, None
)[0]
assert len(both_filter_good) == 1
assert both_filter_good[0]["id"] == roles[0]["id"]
assert both_filter_good[0]["name"] == roles[0]["name"]
both_filter_bad = role_config_query.list_config_service_resources(
[roles[0]["id"], roles[1]["id"]], roles[2]["name"], 100, None
)[0]
assert len(both_filter_bad) == 0
@mock_iam
def test_role_config_dict():
from moto.iam.config import role_config_query, policy_config_query
from moto.iam.utils import random_resource_id, random_policy_id
# Without any roles
assert not role_config_query.get_config_resource("something")
assert role_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
basic_assume_role = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "sts:AssumeRole",}
],
}
basic_policy = {
"Version": "2012-10-17",
"Statement": [{"Action": ["ec2:*"], "Effect": "Allow", "Resource": "*"}],
}
# Create a policy for use in role permissions boundary
policy_arn = (
policy_config_query.backends["global"]
.create_policy(
description="basic_policy",
path="/",
policy_document=json.dumps(basic_policy),
policy_name="basic_policy",
)
.arn
)
policy_id = policy_config_query.list_config_service_resources(
None, None, 100, None
)[0][0]["id"]
assert len(policy_id) == len(random_policy_id())
# Create some roles (and grab them repeatedly since they create with random names)
role_config_query.backends["global"].create_role(
role_name="plain_role",
assume_role_policy_document=None,
path="/",
permissions_boundary=None,
description="plain_role",
tags=[{"Key": "foo", "Value": "bar"}],
max_session_duration=3600,
)
plain_role = role_config_query.list_config_service_resources(None, None, 100, None)[
0
][0]
assert plain_role is not None
assert len(plain_role["id"]) == len(random_resource_id())
role_config_query.backends["global"].create_role(
role_name="assume_role",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=None,
description="assume_role",
tags=[],
max_session_duration=3600,
)
assume_role = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"] not in [plain_role["id"]]
)
assert assume_role is not None
assert len(assume_role["id"]) == len(random_resource_id())
assert assume_role["id"] is not plain_role["id"]
role_config_query.backends["global"].create_role(
role_name="assume_and_permission_boundary_role",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=policy_arn,
description="assume_and_permission_boundary_role",
tags=[],
max_session_duration=3600,
)
assume_and_permission_boundary_role = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"] not in [plain_role["id"], assume_role["id"]]
)
assert assume_and_permission_boundary_role is not None
assert len(assume_and_permission_boundary_role["id"]) == len(random_resource_id())
assert assume_and_permission_boundary_role["id"] is not plain_role["id"]
assert assume_and_permission_boundary_role["id"] is not assume_role["id"]
role_config_query.backends["global"].create_role(
role_name="role_with_attached_policy",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=None,
description="role_with_attached_policy",
tags=[],
max_session_duration=3600,
)
role_config_query.backends["global"].attach_role_policy(
policy_arn, "role_with_attached_policy"
)
role_with_attached_policy = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"]
not in [
plain_role["id"],
assume_role["id"],
assume_and_permission_boundary_role["id"],
]
)
assert role_with_attached_policy is not None
assert len(role_with_attached_policy["id"]) == len(random_resource_id())
assert role_with_attached_policy["id"] is not plain_role["id"]
assert role_with_attached_policy["id"] is not assume_role["id"]
assert (
role_with_attached_policy["id"] is not assume_and_permission_boundary_role["id"]
)
role_config_query.backends["global"].create_role(
role_name="role_with_inline_policy",
assume_role_policy_document=json.dumps(basic_assume_role),
path="/",
permissions_boundary=None,
description="role_with_inline_policy",
tags=[],
max_session_duration=3600,
)
role_config_query.backends["global"].put_role_policy(
"role_with_inline_policy", "inline_policy", json.dumps(basic_policy)
)
role_with_inline_policy = next(
role
for role in role_config_query.list_config_service_resources(
None, None, 100, None
)[0]
if role["id"]
not in [
plain_role["id"],
assume_role["id"],
assume_and_permission_boundary_role["id"],
role_with_attached_policy["id"],
]
)
assert role_with_inline_policy is not None
assert len(role_with_inline_policy["id"]) == len(random_resource_id())
assert role_with_inline_policy["id"] is not plain_role["id"]
assert role_with_inline_policy["id"] is not assume_role["id"]
assert (
role_with_inline_policy["id"] is not assume_and_permission_boundary_role["id"]
)
assert role_with_inline_policy["id"] is not role_with_attached_policy["id"]
# plain role
plain_role_config = (
role_config_query.backends["global"].roles[plain_role["id"]].to_config_dict()
)
assert plain_role_config["version"] == "1.3"
assert plain_role_config["configurationItemStatus"] == "ResourceDiscovered"
assert plain_role_config["configurationStateId"] is not None
assert plain_role_config["arn"] == "arn:aws:iam::123456789012:role/plain_role"
assert plain_role_config["resourceType"] == "AWS::IAM::Role"
assert plain_role_config["resourceId"] == "plain_role"
assert plain_role_config["resourceName"] == "plain_role"
assert plain_role_config["awsRegion"] == "global"
assert plain_role_config["availabilityZone"] == "Not Applicable"
assert plain_role_config["resourceCreationTime"] is not None
assert plain_role_config["tags"] == {"foo": {"Key": "foo", "Value": "bar"}}
assert plain_role_config["configuration"]["path"] == "/"
assert plain_role_config["configuration"]["roleName"] == "plain_role"
assert plain_role_config["configuration"]["roleId"] == plain_role["id"]
assert plain_role_config["configuration"]["arn"] == plain_role_config["arn"]
assert plain_role_config["configuration"]["assumeRolePolicyDocument"] is None
assert plain_role_config["configuration"]["instanceProfileList"] == []
assert plain_role_config["configuration"]["rolePolicyList"] == []
assert plain_role_config["configuration"]["attachedManagedPolicies"] == []
assert plain_role_config["configuration"]["permissionsBoundary"] is None
assert plain_role_config["configuration"]["tags"] == [
{"key": "foo", "value": "bar"}
]
assert plain_role_config["supplementaryConfiguration"] == {}
# assume_role
assume_role_config = (
role_config_query.backends["global"].roles[assume_role["id"]].to_config_dict()
)
assert assume_role_config["arn"] == "arn:aws:iam::123456789012:role/assume_role"
assert assume_role_config["resourceId"] == "assume_role"
assert assume_role_config["resourceName"] == "assume_role"
assert assume_role_config["configuration"][
"assumeRolePolicyDocument"
] == parse.quote(json.dumps(basic_assume_role))
# assume_and_permission_boundary_role
assume_and_permission_boundary_role_config = (
role_config_query.backends["global"]
.roles[assume_and_permission_boundary_role["id"]]
.to_config_dict()
)
assert (
assume_and_permission_boundary_role_config["arn"]
== "arn:aws:iam::123456789012:role/assume_and_permission_boundary_role"
)
assert (
assume_and_permission_boundary_role_config["resourceId"]
== "assume_and_permission_boundary_role"
)
assert (
assume_and_permission_boundary_role_config["resourceName"]
== "assume_and_permission_boundary_role"
)
assert assume_and_permission_boundary_role_config["configuration"][
"assumeRolePolicyDocument"
] == parse.quote(json.dumps(basic_assume_role))
assert (
assume_and_permission_boundary_role_config["configuration"][
"permissionsBoundary"
]
== policy_arn
)
# role_with_attached_policy
role_with_attached_policy_config = (
role_config_query.backends["global"]
.roles[role_with_attached_policy["id"]]
.to_config_dict()
)
assert (
role_with_attached_policy_config["arn"]
== "arn:aws:iam::123456789012:role/role_with_attached_policy"
)
assert role_with_attached_policy_config["configuration"][
"attachedManagedPolicies"
] == [{"policyArn": policy_arn, "policyName": "basic_policy"}]
# role_with_inline_policy
role_with_inline_policy_config = (
role_config_query.backends["global"]
.roles[role_with_inline_policy["id"]]
.to_config_dict()
)
assert (
role_with_inline_policy_config["arn"]
== "arn:aws:iam::123456789012:role/role_with_inline_policy"
)
assert role_with_inline_policy_config["configuration"]["rolePolicyList"] == [
{
"policyName": "inline_policy",
"policyDocument": parse.quote(json.dumps(basic_policy)),
}
]
@mock_iam
@mock_config
def test_role_config_client():
from moto.iam.models import ACCOUNT_ID
from moto.iam.utils import random_resource_id
CONFIG_REGIONS = boto3.Session().get_available_regions("config")
iam_client = boto3.client("iam", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
all_account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
two_region_account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AwsRegions": ["us-east-1", "us-west-2"],
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="test_aggregator",
AccountAggregationSources=[all_account_aggregation_source],
)
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="test_aggregator_two_regions",
AccountAggregationSources=[two_region_account_aggregation_source],
)
result = config_client.list_discovered_resources(resourceType="AWS::IAM::Role")
assert not result["resourceIdentifiers"]
# Make 10 policies
roles = []
num_roles = 10
for ix in range(1, num_roles + 1):
this_policy = iam_client.create_role(
RoleName="role{}".format(ix),
Path="/",
Description="role{}".format(ix),
AssumeRolePolicyDocument=json.dumps("{ }"),
)
roles.append(
{
"id": this_policy["Role"]["RoleId"],
"name": this_policy["Role"]["RoleName"],
}
)
assert len(roles) == num_roles
# Test non-aggregated query: (everything is getting a random id, so we can't test names by ordering)
result = config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", limit=1
)
first_result = result["resourceIdentifiers"][0]["resourceId"]
assert result["resourceIdentifiers"][0]["resourceType"] == "AWS::IAM::Role"
assert len(first_result) == len(random_resource_id())
# Test non-aggregated pagination
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", limit=1, nextToken=result["nextToken"],
)["resourceIdentifiers"][0]["resourceId"]
) != first_result
# Test aggregated query - by `Limit=len(CONFIG_REGIONS)`, we should get a single policy duplicated across all regions
agg_result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::IAM::Role",
ConfigurationAggregatorName="test_aggregator",
Limit=len(CONFIG_REGIONS),
)
assert len(agg_result["ResourceIdentifiers"]) == len(CONFIG_REGIONS)
agg_name = None
agg_id = None
for resource in agg_result["ResourceIdentifiers"]:
assert resource["ResourceType"] == "AWS::IAM::Role"
assert resource["SourceRegion"] in CONFIG_REGIONS
assert resource["SourceAccountId"] == ACCOUNT_ID
if agg_id:
assert resource["ResourceId"] == agg_id
if agg_name:
assert resource["ResourceName"] == agg_name
agg_name = resource["ResourceName"]
agg_id = resource["ResourceId"]
# Test aggregated pagination
for resource in config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Role",
NextToken=agg_result["NextToken"],
)["ResourceIdentifiers"]:
assert resource["ResourceId"] != agg_id
# Test non-aggregated resource name/id filter
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", resourceName=roles[1]["name"], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== roles[1]["name"]
)
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", resourceIds=[roles[0]["id"]], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== roles[0]["name"]
)
# Test aggregated resource name/id filter
agg_name_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Role",
Filters={"ResourceName": roles[5]["name"]},
)
assert len(agg_name_filter["ResourceIdentifiers"]) == len(CONFIG_REGIONS)
assert agg_name_filter["ResourceIdentifiers"][0]["ResourceId"] == roles[5]["id"]
agg_name_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator_two_regions",
ResourceType="AWS::IAM::Role",
Filters={"ResourceName": roles[5]["name"]},
)
assert len(agg_name_filter["ResourceIdentifiers"]) == len(
two_region_account_aggregation_source["AwsRegions"]
)
assert agg_name_filter["ResourceIdentifiers"][0]["ResourceId"] == roles[5]["id"]
agg_id_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Role",
Filters={"ResourceId": roles[4]["id"]},
)
assert len(agg_id_filter["ResourceIdentifiers"]) == len(CONFIG_REGIONS)
assert agg_id_filter["ResourceIdentifiers"][0]["ResourceName"] == roles[4]["name"]
agg_name_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator_two_regions",
ResourceType="AWS::IAM::Role",
Filters={"ResourceId": roles[5]["id"]},
)
assert len(agg_name_filter["ResourceIdentifiers"]) == len(
two_region_account_aggregation_source["AwsRegions"]
)
assert agg_name_filter["ResourceIdentifiers"][0]["ResourceName"] == roles[5]["name"]
# Test non-aggregated resource name/id filter
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", resourceName=roles[1]["name"], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== roles[1]["name"]
)
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Role", resourceIds=[roles[0]["id"]], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== roles[0]["name"]
)
# Test aggregated resource name/id filter
assert (
config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Role",
Filters={"ResourceName": roles[5]["name"]},
Limit=1,
)["ResourceIdentifiers"][0]["ResourceName"]
== roles[5]["name"]
)
assert (
config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Role",
Filters={"ResourceId": roles[4]["id"]},
Limit=1,
)["ResourceIdentifiers"][0]["ResourceName"]
== roles[4]["name"]
)
# Test name/id filter with pagination
first_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Role",
resourceIds=[roles[1]["id"], roles[2]["id"]],
limit=1,
)
assert first_call["nextToken"] in [roles[1]["id"], roles[2]["id"]]
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
roles[1]["name"],
roles[2]["name"],
]
second_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Role",
resourceIds=[roles[1]["id"], roles[2]["id"]],
limit=1,
nextToken=first_call["nextToken"],
)
assert "nextToken" not in second_call
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
roles[1]["name"],
roles[2]["name"],
]
assert (
first_call["resourceIdentifiers"][0]["resourceName"]
!= second_call["resourceIdentifiers"][0]["resourceName"]
)
# Test non-aggregated batch get
assert (
config_client.batch_get_resource_config(
resourceKeys=[
{"resourceType": "AWS::IAM::Role", "resourceId": roles[0]["id"]}
]
)["baseConfigurationItems"][0]["resourceName"]
== roles[0]["name"]
)
# Test aggregated batch get
assert (
config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="test_aggregator",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": "us-east-1",
"ResourceId": roles[1]["id"],
"ResourceType": "AWS::IAM::Role",
}
],
)["BaseConfigurationItems"][0]["resourceName"]
== roles[1]["name"]
)
@mock_iam
def test_policy_list_config_discovered_resources():
from moto.iam.config import policy_config_query
from moto.iam.utils import random_policy_id
# Without any policies
assert policy_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
basic_policy = {
"Version": "2012-10-17",
"Statement": [
{"Action": ["ec2:DeleteKeyPair"], "Effect": "Deny", "Resource": "*"}
],
}
# Make 3 policies
policies = []
num_policies = 3
for ix in range(1, num_policies + 1):
this_policy = policy_config_query.backends["global"].create_policy(
description="policy{}".format(ix),
path="",
policy_document=json.dumps(basic_policy),
policy_name="policy{}".format(ix),
)
policies.append(
{"id": this_policy.id, "name": this_policy.name,}
)
assert len(policies) == num_policies
# We expect the backend to have arns as their keys
for backend_key in list(
policy_config_query.backends["global"].managed_policies.keys()
):
assert backend_key.startswith("arn:aws:iam::")
result = policy_config_query.list_config_service_resources(None, None, 100, None)[0]
assert len(result) == num_policies
policy = result[0]
assert policy["type"] == "AWS::IAM::Policy"
assert policy["id"] in list(map(lambda p: p["id"], policies))
assert policy["name"] in list(map(lambda p: p["name"], policies))
assert policy["region"] == "global"
# test passing list of resource ids
resource_ids = policy_config_query.list_config_service_resources(
[policies[0]["id"], policies[1]["id"]], None, 100, None
)[0]
assert len(resource_ids) == 2
# test passing a single resource name
resource_name = policy_config_query.list_config_service_resources(
None, policies[0]["name"], 100, None
)[0]
assert len(resource_name) == 1
assert resource_name[0]["id"] == policies[0]["id"]
assert resource_name[0]["name"] == policies[0]["name"]
# test passing a single resource name AND some resource id's
both_filter_good = policy_config_query.list_config_service_resources(
[policies[0]["id"], policies[1]["id"]], policies[0]["name"], 100, None
)[0]
assert len(both_filter_good) == 1
assert both_filter_good[0]["id"] == policies[0]["id"]
assert both_filter_good[0]["name"] == policies[0]["name"]
both_filter_bad = policy_config_query.list_config_service_resources(
[policies[0]["id"], policies[1]["id"]], policies[2]["name"], 100, None
)[0]
assert len(both_filter_bad) == 0
@mock_iam
def test_policy_config_dict():
from moto.iam.config import role_config_query, policy_config_query
from moto.iam.utils import random_policy_id
# Without any roles
assert not policy_config_query.get_config_resource(
"arn:aws:iam::123456789012:policy/basic_policy"
)
assert policy_config_query.list_config_service_resources(None, None, 100, None) == (
[],
None,
)
basic_policy = {
"Version": "2012-10-17",
"Statement": [{"Action": ["ec2:*"], "Effect": "Allow", "Resource": "*"}],
}
basic_policy_v2 = {
"Version": "2012-10-17",
"Statement": [
{"Action": ["ec2:*", "s3:*"], "Effect": "Allow", "Resource": "*"}
],
}
policy_arn = (
policy_config_query.backends["global"]
.create_policy(
description="basic_policy",
path="/",
policy_document=json.dumps(basic_policy),
policy_name="basic_policy",
)
.arn
)
policy_id = policy_config_query.list_config_service_resources(
None, None, 100, None
)[0][0]["id"]
assert len(policy_id) == len(random_policy_id())
assert policy_arn == "arn:aws:iam::123456789012:policy/basic_policy"
assert policy_config_query.get_config_resource(policy_id) is not None
# Create a new version
policy_config_query.backends["global"].create_policy_version(
policy_arn, json.dumps(basic_policy_v2), "true"
)
# Create role to trigger attachment
role_config_query.backends["global"].create_role(
role_name="role_with_attached_policy",
assume_role_policy_document=None,
path="/",
permissions_boundary=None,
description="role_with_attached_policy",
tags=[],
max_session_duration=3600,
)
role_config_query.backends["global"].attach_role_policy(
policy_arn, "role_with_attached_policy"
)
policy = (
role_config_query.backends["global"]
.managed_policies["arn:aws:iam::123456789012:policy/basic_policy"]
.to_config_dict()
)
assert policy["version"] == "1.3"
assert policy["configurationItemCaptureTime"] is not None
assert policy["configurationItemStatus"] == "OK"
assert policy["configurationStateId"] is not None
assert policy["arn"] == "arn:aws:iam::123456789012:policy/basic_policy"
assert policy["resourceType"] == "AWS::IAM::Policy"
assert len(policy["resourceId"]) == len(random_policy_id())
assert policy["resourceName"] == "basic_policy"
assert policy["awsRegion"] == "global"
assert policy["availabilityZone"] == "Not Applicable"
assert policy["resourceCreationTime"] is not None
assert policy["configuration"]["policyName"] == policy["resourceName"]
assert policy["configuration"]["policyId"] == policy["resourceId"]
assert policy["configuration"]["arn"] == policy["arn"]
assert policy["configuration"]["path"] == "/"
assert policy["configuration"]["defaultVersionId"] == "v2"
assert policy["configuration"]["attachmentCount"] == 1
assert policy["configuration"]["permissionsBoundaryUsageCount"] == 0
assert policy["configuration"]["isAttachable"] == True
assert policy["configuration"]["description"] == "basic_policy"
assert policy["configuration"]["createDate"] is not None
assert policy["configuration"]["updateDate"] is not None
assert policy["configuration"]["policyVersionList"] == [
{
"document": str(parse.quote(json.dumps(basic_policy))),
"versionId": "v1",
"isDefaultVersion": False,
"createDate": policy["configuration"]["policyVersionList"][0]["createDate"],
},
{
"document": str(parse.quote(json.dumps(basic_policy_v2))),
"versionId": "v2",
"isDefaultVersion": True,
"createDate": policy["configuration"]["policyVersionList"][1]["createDate"],
},
]
assert policy["supplementaryConfiguration"] == {}
@mock_iam
@mock_config
def test_policy_config_client():
from moto.iam.models import ACCOUNT_ID
from moto.iam.utils import random_policy_id
CONFIG_REGIONS = boto3.Session().get_available_regions("config")
basic_policy = {
"Version": "2012-10-17",
"Statement": [{"Action": ["ec2:*"], "Effect": "Allow", "Resource": "*"}],
}
iam_client = boto3.client("iam", region_name="us-west-2")
config_client = boto3.client("config", region_name="us-west-2")
all_account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AllAwsRegions": True,
}
two_region_account_aggregation_source = {
"AccountIds": [ACCOUNT_ID],
"AwsRegions": ["us-east-1", "us-west-2"],
}
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="test_aggregator",
AccountAggregationSources=[all_account_aggregation_source],
)
config_client.put_configuration_aggregator(
ConfigurationAggregatorName="test_aggregator_two_regions",
AccountAggregationSources=[two_region_account_aggregation_source],
)
result = config_client.list_discovered_resources(resourceType="AWS::IAM::Policy")
assert not result["resourceIdentifiers"]
# Make 10 policies
policies = []
num_policies = 10
for ix in range(1, num_policies + 1):
this_policy = iam_client.create_policy(
PolicyName="policy{}".format(ix),
Path="/",
PolicyDocument=json.dumps(basic_policy),
Description="policy{}".format(ix),
)
policies.append(
{
"id": this_policy["Policy"]["PolicyId"],
"name": this_policy["Policy"]["PolicyName"],
}
)
assert len(policies) == num_policies
# Test non-aggregated query: (everything is getting a random id, so we can't test names by ordering)
result = config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy", limit=1
)
first_result = result["resourceIdentifiers"][0]["resourceId"]
assert result["resourceIdentifiers"][0]["resourceType"] == "AWS::IAM::Policy"
assert len(first_result) == len(random_policy_id())
# Test non-aggregated pagination
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy", limit=1, nextToken=result["nextToken"],
)["resourceIdentifiers"][0]["resourceId"]
) != first_result
# Test aggregated query - by `Limit=len(CONFIG_REGIONS)`, we should get a single policy duplicated across all regions
agg_result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::IAM::Policy",
ConfigurationAggregatorName="test_aggregator",
Limit=len(CONFIG_REGIONS),
)
assert len(agg_result["ResourceIdentifiers"]) == len(CONFIG_REGIONS)
agg_name = None
agg_id = None
for resource in agg_result["ResourceIdentifiers"]:
assert resource["ResourceType"] == "AWS::IAM::Policy"
assert resource["SourceRegion"] in CONFIG_REGIONS
assert resource["SourceAccountId"] == ACCOUNT_ID
if agg_id:
assert resource["ResourceId"] == agg_id
if agg_name:
assert resource["ResourceName"] == agg_name
agg_name = resource["ResourceName"]
agg_id = resource["ResourceId"]
# Test aggregated pagination
for resource in config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Policy",
Limit=1,
NextToken=agg_result["NextToken"],
)["ResourceIdentifiers"]:
assert resource["ResourceId"] != agg_id
# Test non-aggregated resource name/id filter
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy", resourceName=policies[1]["name"], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== policies[1]["name"]
)
assert (
config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy", resourceIds=[policies[0]["id"]], limit=1,
)["resourceIdentifiers"][0]["resourceName"]
== policies[0]["name"]
)
# Test aggregated resource name/id filter
agg_name_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Policy",
Filters={"ResourceName": policies[5]["name"]},
)
assert len(agg_name_filter["ResourceIdentifiers"]) == len(CONFIG_REGIONS)
assert (
agg_name_filter["ResourceIdentifiers"][0]["ResourceName"] == policies[5]["name"]
)
agg_name_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator_two_regions",
ResourceType="AWS::IAM::Policy",
Filters={"ResourceName": policies[5]["name"]},
)
assert len(agg_name_filter["ResourceIdentifiers"]) == len(
two_region_account_aggregation_source["AwsRegions"]
)
assert agg_name_filter["ResourceIdentifiers"][0]["ResourceId"] == policies[5]["id"]
agg_id_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator",
ResourceType="AWS::IAM::Policy",
Filters={"ResourceId": policies[4]["id"]},
)
assert len(agg_id_filter["ResourceIdentifiers"]) == len(CONFIG_REGIONS)
assert (
agg_id_filter["ResourceIdentifiers"][0]["ResourceName"] == policies[4]["name"]
)
agg_name_filter = config_client.list_aggregate_discovered_resources(
ConfigurationAggregatorName="test_aggregator_two_regions",
ResourceType="AWS::IAM::Policy",
Filters={"ResourceId": policies[5]["id"]},
)
assert len(agg_name_filter["ResourceIdentifiers"]) == len(
two_region_account_aggregation_source["AwsRegions"]
)
assert (
agg_name_filter["ResourceIdentifiers"][0]["ResourceName"] == policies[5]["name"]
)
# Test name/id filter with pagination
first_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy",
resourceIds=[policies[1]["id"], policies[2]["id"]],
limit=1,
)
assert first_call["nextToken"] in [policies[1]["id"], policies[2]["id"]]
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
policies[1]["name"],
policies[2]["name"],
]
second_call = config_client.list_discovered_resources(
resourceType="AWS::IAM::Policy",
resourceIds=[policies[1]["id"], policies[2]["id"]],
limit=1,
nextToken=first_call["nextToken"],
)
assert "nextToken" not in second_call
assert first_call["resourceIdentifiers"][0]["resourceName"] in [
policies[1]["name"],
policies[2]["name"],
]
assert (
first_call["resourceIdentifiers"][0]["resourceName"]
!= second_call["resourceIdentifiers"][0]["resourceName"]
)
# Test non-aggregated batch get
assert (
config_client.batch_get_resource_config(
resourceKeys=[
{"resourceType": "AWS::IAM::Policy", "resourceId": policies[7]["id"],}
]
)["baseConfigurationItems"][0]["resourceName"]
== policies[7]["name"]
)
# Test aggregated batch get
assert (
config_client.batch_get_aggregate_resource_config(
ConfigurationAggregatorName="test_aggregator",
ResourceIdentifiers=[
{
"SourceAccountId": ACCOUNT_ID,
"SourceRegion": "us-east-2",
"ResourceId": policies[8]["id"],
"ResourceType": "AWS::IAM::Policy",
}
],
)["BaseConfigurationItems"][0]["resourceName"]
== policies[8]["name"]
)
@mock_iam()
def test_list_roles_with_more_than_100_roles_no_max_items_defaults_to_100():
iam = boto3.client("iam", region_name="us-east-1")
for i in range(150):
iam.create_role(
RoleName="test_role_{}".format(i), AssumeRolePolicyDocument="some policy"
)
response = iam.list_roles()
roles = response["Roles"]
assert response["IsTruncated"] is True
assert len(roles) == 100
@mock_iam()
def test_list_roles_max_item_and_marker_values_adhered():
iam = boto3.client("iam", region_name="us-east-1")
for i in range(10):
iam.create_role(
RoleName="test_role_{}".format(i), AssumeRolePolicyDocument="some policy"
)
response = iam.list_roles(MaxItems=2)
roles = response["Roles"]
assert response["IsTruncated"] is True
assert len(roles) == 2
response = iam.list_roles(Marker=response["Marker"])
roles = response["Roles"]
assert response["IsTruncated"] is False
assert len(roles) == 8
@mock_iam()
def test_list_roles_path_prefix_value_adhered():
iam = boto3.client("iam", region_name="us-east-1")
iam.create_role(
RoleName="test_role_without_path", AssumeRolePolicyDocument="some policy"
)
iam.create_role(
RoleName="test_role_with_path",
AssumeRolePolicyDocument="some policy",
Path="/TestPath/",
)
response = iam.list_roles(PathPrefix="/TestPath/")
roles = response["Roles"]
assert len(roles) == 1
assert roles[0]["RoleName"] == "test_role_with_path"
@mock_iam()
def test_list_roles_none_found_returns_empty_list():
iam = boto3.client("iam", region_name="us-east-1")
response = iam.list_roles()
roles = response["Roles"]
assert len(roles) == 0
response = iam.list_roles(PathPrefix="/TestPath")
roles = response["Roles"]
assert len(roles) == 0
response = iam.list_roles(Marker="10")
roles = response["Roles"]
assert len(roles) == 0
response = iam.list_roles(MaxItems=10)
roles = response["Roles"]
assert len(roles) == 0
@mock_iam()
def test_create_user_with_tags():
conn = boto3.client("iam", region_name="us-east-1")
user_name = "test-user"
tags = [
{"Key": "somekey", "Value": "somevalue"},
{"Key": "someotherkey", "Value": "someothervalue"},
]
resp = conn.create_user(UserName=user_name, Tags=tags)
assert resp["User"]["Tags"] == tags
resp = conn.list_user_tags(UserName=user_name)
assert resp["Tags"] == tags
resp = conn.create_user(UserName="test-create-user-no-tags")
assert "Tags" not in resp["User"]