861 lines
29 KiB
Python
861 lines
29 KiB
Python
import json
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
import pytest
|
|
|
|
from moto import mock_iam, mock_ec2, mock_s3, mock_sts, mock_ssm, mock_elbv2, mock_rds
|
|
from moto.core import set_initial_no_auth_action_count
|
|
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
|
from uuid import uuid4
|
|
|
|
|
|
@mock_iam
|
|
def create_user_with_access_key(user_name="test-user"):
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_user(UserName=user_name)
|
|
return client.create_access_key(UserName=user_name)["AccessKey"]
|
|
|
|
|
|
@mock_iam
|
|
def create_user_with_access_key_and_inline_policy(
|
|
user_name, policy_document, policy_name="policy1"
|
|
):
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_user(UserName=user_name)
|
|
client.put_user_policy(
|
|
UserName=user_name,
|
|
PolicyName=policy_name,
|
|
PolicyDocument=json.dumps(policy_document),
|
|
)
|
|
return client.create_access_key(UserName=user_name)["AccessKey"]
|
|
|
|
|
|
@mock_iam
|
|
def create_user_with_access_key_and_attached_policy(
|
|
user_name, policy_document, policy_name="policy1"
|
|
):
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_user(UserName=user_name)
|
|
policy_arn = client.create_policy(
|
|
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
|
|
)["Policy"]["Arn"]
|
|
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
|
|
return client.create_access_key(UserName=user_name)["AccessKey"]
|
|
|
|
|
|
@mock_iam
|
|
def create_user_with_access_key_and_multiple_policies(
|
|
user_name,
|
|
inline_policy_document,
|
|
attached_policy_document,
|
|
inline_policy_name="policy1",
|
|
attached_policy_name="policy1",
|
|
):
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_user(UserName=user_name)
|
|
policy_arn = client.create_policy(
|
|
PolicyName=attached_policy_name,
|
|
PolicyDocument=json.dumps(attached_policy_document),
|
|
)["Policy"]["Arn"]
|
|
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
|
|
client.put_user_policy(
|
|
UserName=user_name,
|
|
PolicyName=inline_policy_name,
|
|
PolicyDocument=json.dumps(inline_policy_document),
|
|
)
|
|
return client.create_access_key(UserName=user_name)["AccessKey"]
|
|
|
|
|
|
def create_group_with_attached_policy_and_add_user(
|
|
user_name, policy_document, group_name="test-group", policy_name=None
|
|
):
|
|
if not policy_name:
|
|
policy_name = str(uuid4())
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_group(GroupName=group_name)
|
|
policy_arn = client.create_policy(
|
|
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
|
|
)["Policy"]["Arn"]
|
|
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
|
|
client.add_user_to_group(GroupName=group_name, UserName=user_name)
|
|
|
|
|
|
def create_group_with_inline_policy_and_add_user(
|
|
user_name, policy_document, group_name="test-group", policy_name="policy1"
|
|
):
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_group(GroupName=group_name)
|
|
client.put_group_policy(
|
|
GroupName=group_name,
|
|
PolicyName=policy_name,
|
|
PolicyDocument=json.dumps(policy_document),
|
|
)
|
|
client.add_user_to_group(GroupName=group_name, UserName=user_name)
|
|
|
|
|
|
def create_group_with_multiple_policies_and_add_user(
|
|
user_name,
|
|
inline_policy_document,
|
|
attached_policy_document,
|
|
group_name="test-group",
|
|
inline_policy_name="policy1",
|
|
attached_policy_name=None,
|
|
):
|
|
if not attached_policy_name:
|
|
attached_policy_name = str(uuid4())
|
|
client = boto3.client("iam", region_name="us-east-1")
|
|
client.create_group(GroupName=group_name)
|
|
client.put_group_policy(
|
|
GroupName=group_name,
|
|
PolicyName=inline_policy_name,
|
|
PolicyDocument=json.dumps(inline_policy_document),
|
|
)
|
|
policy_arn = client.create_policy(
|
|
PolicyName=attached_policy_name,
|
|
PolicyDocument=json.dumps(attached_policy_document),
|
|
)["Policy"]["Arn"]
|
|
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
|
|
client.add_user_to_group(GroupName=group_name, UserName=user_name)
|
|
|
|
|
|
@mock_iam
|
|
@mock_sts
|
|
def create_role_with_attached_policy_and_assume_it(
|
|
role_name,
|
|
trust_policy_document,
|
|
policy_document,
|
|
session_name="session1",
|
|
policy_name="policy1",
|
|
):
|
|
iam_client = boto3.client("iam", region_name="us-east-1")
|
|
sts_client = boto3.client("sts", region_name="us-east-1")
|
|
role_arn = iam_client.create_role(
|
|
RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy_document)
|
|
)["Role"]["Arn"]
|
|
policy_arn = iam_client.create_policy(
|
|
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
|
|
)["Policy"]["Arn"]
|
|
iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
|
|
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)[
|
|
"Credentials"
|
|
]
|
|
|
|
|
|
@mock_iam
|
|
@mock_sts
|
|
def create_role_with_inline_policy_and_assume_it(
|
|
role_name,
|
|
trust_policy_document,
|
|
policy_document,
|
|
session_name="session1",
|
|
policy_name="policy1",
|
|
):
|
|
iam_client = boto3.client("iam", region_name="us-east-1")
|
|
sts_client = boto3.client("sts", region_name="us-east-1")
|
|
role_arn = iam_client.create_role(
|
|
RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy_document)
|
|
)["Role"]["Arn"]
|
|
iam_client.put_role_policy(
|
|
RoleName=role_name,
|
|
PolicyName=policy_name,
|
|
PolicyDocument=json.dumps(policy_document),
|
|
)
|
|
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)[
|
|
"Credentials"
|
|
]
|
|
|
|
|
|
@set_initial_no_auth_action_count(0)
|
|
@mock_iam
|
|
def test_invalid_client_token_id():
|
|
client = boto3.client(
|
|
"iam",
|
|
region_name="us-east-1",
|
|
aws_access_key_id="invalid",
|
|
aws_secret_access_key="invalid",
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.get_user()
|
|
err = ex.value.response["Error"]
|
|
assert err["Code"] == "InvalidClientTokenId"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert err["Message"] == "The security token included in the request is invalid."
|
|
|
|
|
|
@set_initial_no_auth_action_count(0)
|
|
@mock_ec2
|
|
def test_auth_failure():
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id="invalid",
|
|
aws_secret_access_key="invalid",
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.describe_instances()
|
|
err = ex.value.response["Error"]
|
|
assert err["Code"] == "AuthFailure"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 401
|
|
assert (
|
|
err["Message"] == "AWS was not able to validate the provided access credentials"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(2)
|
|
@mock_iam
|
|
def test_signature_does_not_match():
|
|
access_key = create_user_with_access_key()
|
|
client = boto3.client(
|
|
"iam",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key="invalid",
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.get_user()
|
|
assert ex.value.response["Error"]["Code"] == "SignatureDoesNotMatch"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== "The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(2)
|
|
@mock_ec2
|
|
def test_auth_failure_with_valid_access_key_id():
|
|
access_key = create_user_with_access_key()
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key="invalid",
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.describe_instances()
|
|
assert ex.value.response["Error"]["Code"] == "AuthFailure"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 401
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== "AWS was not able to validate the provided access credentials"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(2)
|
|
@mock_ec2
|
|
def test_access_denied_with_no_policy():
|
|
user_name = "test-user"
|
|
access_key = create_user_with_access_key(user_name)
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.describe_instances()
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_ec2
|
|
def test_access_denied_with_not_allowing_policy():
|
|
user_name = "test-user"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": ["ec2:Run*"], "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.describe_instances()
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_ec2
|
|
def test_access_denied_for_run_instances():
|
|
# https://github.com/getmoto/moto/issues/2774
|
|
# The run-instances method was broken between botocore versions 1.15.8 and 1.15.12
|
|
# This was due to the inclusion of '"idempotencyToken":true' in the response, somehow altering the signature and breaking the authentication
|
|
# Keeping this test in place in case botocore decides to break again
|
|
user_name = "test-user"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{"Effect": "Allow", "Action": ["ec2:Describe*"], "Resource": "*"}
|
|
],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.run_instances(MaxCount=1, MinCount=1)
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:RunInstances"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_ec2
|
|
def test_access_denied_with_denying_policy():
|
|
user_name = "test-user"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{"Effect": "Allow", "Action": ["ec2:*"], "Resource": "*"},
|
|
{"Effect": "Deny", "Action": "ec2:CreateVpc", "Resource": "*"},
|
|
],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.create_vpc(CidrBlock="10.0.0.0/16")
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateVpc"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_sts
|
|
def test_get_caller_identity_allowed_with_denying_policy():
|
|
user_name = "test-user"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{"Effect": "Deny", "Action": "sts:GetCallerIdentity", "Resource": "*"}
|
|
],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"sts",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
assert (
|
|
client.get_caller_identity()["Arn"]
|
|
== f"arn:aws:iam::{ACCOUNT_ID}:user/{user_name}"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_ec2
|
|
def test_allowed_with_wildcard_action():
|
|
user_name = "test-user"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "ec2:Describe*", "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
assert client.describe_tags()["Tags"] == []
|
|
|
|
|
|
@set_initial_no_auth_action_count(4)
|
|
@mock_iam
|
|
def test_allowed_with_explicit_action_in_attached_policy():
|
|
user_name = "test-user"
|
|
attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "iam:ListGroups", "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_attached_policy(
|
|
user_name, attached_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"iam",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
assert client.list_groups()["Groups"] == []
|
|
|
|
|
|
@set_initial_no_auth_action_count(8)
|
|
@mock_s3
|
|
@mock_iam
|
|
def test_s3_access_denied_with_denying_attached_group_policy():
|
|
user_name = "test-user"
|
|
attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{"Effect": "Allow", "Action": "s3:ListAllMyBuckets", "Resource": "*"}
|
|
],
|
|
}
|
|
group_attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Deny", "Action": "s3:List*", "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_attached_policy(
|
|
user_name, attached_policy_document, policy_name="policy1"
|
|
)
|
|
create_group_with_attached_policy_and_add_user(
|
|
user_name, group_attached_policy_document, policy_name="policy2"
|
|
)
|
|
client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.list_buckets()
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert ex.value.response["Error"]["Message"] == "Access Denied"
|
|
|
|
|
|
@set_initial_no_auth_action_count(6)
|
|
@mock_s3
|
|
@mock_iam
|
|
def test_s3_access_denied_with_denying_inline_group_policy():
|
|
user_name = "test-user"
|
|
bucket_name = "test-bucket"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "*", "Resource": "*"}],
|
|
}
|
|
group_inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Deny", "Action": "s3:GetObject", "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
create_group_with_inline_policy_and_add_user(
|
|
user_name, group_inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
client.create_bucket(Bucket=bucket_name)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.get_object(Bucket=bucket_name, Key="sdfsdf")
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert ex.value.response["Error"]["Message"] == "Access Denied"
|
|
|
|
|
|
@set_initial_no_auth_action_count(10)
|
|
@mock_iam
|
|
@mock_ec2
|
|
def test_access_denied_with_many_irrelevant_policies():
|
|
user_name = "test-user"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "ec2:Describe*", "Resource": "*"}],
|
|
}
|
|
attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}],
|
|
}
|
|
group_inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Deny", "Action": "iam:List*", "Resource": "*"}],
|
|
}
|
|
group_attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Deny", "Action": "lambda:*", "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_multiple_policies(
|
|
user_name,
|
|
inline_policy_document,
|
|
attached_policy_document,
|
|
attached_policy_name="policy1",
|
|
)
|
|
create_group_with_multiple_policies_and_add_user(
|
|
user_name,
|
|
group_inline_policy_document,
|
|
group_attached_policy_document,
|
|
attached_policy_name="policy2",
|
|
)
|
|
client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.create_key_pair(KeyName="TestKey")
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateKeyPair"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(4)
|
|
@mock_iam
|
|
@mock_sts
|
|
@mock_ec2
|
|
@mock_elbv2
|
|
def test_allowed_with_temporary_credentials():
|
|
role_name = "test-role"
|
|
trust_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
|
|
"Action": "sts:AssumeRole",
|
|
},
|
|
}
|
|
attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Effect": "Allow",
|
|
"Action": [
|
|
"elasticloadbalancing:CreateLoadBalancer",
|
|
"ec2:DescribeSubnets",
|
|
],
|
|
"Resource": "*",
|
|
}
|
|
],
|
|
}
|
|
credentials = create_role_with_attached_policy_and_assume_it(
|
|
role_name, trust_policy_document, attached_policy_document
|
|
)
|
|
elbv2_client = boto3.client(
|
|
"elbv2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=credentials["AccessKeyId"],
|
|
aws_secret_access_key=credentials["SecretAccessKey"],
|
|
aws_session_token=credentials["SessionToken"],
|
|
)
|
|
ec2_client = boto3.client(
|
|
"ec2",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=credentials["AccessKeyId"],
|
|
aws_secret_access_key=credentials["SecretAccessKey"],
|
|
aws_session_token=credentials["SessionToken"],
|
|
)
|
|
subnets = ec2_client.describe_subnets()["Subnets"]
|
|
assert len(subnets) > 1
|
|
subnet_ids = [subnets[0]["SubnetId"], subnets[1]["SubnetId"]]
|
|
resp = elbv2_client.create_load_balancer(Name="lb", Subnets=subnet_ids)
|
|
assert len(resp["LoadBalancers"]) == 1
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_iam
|
|
@mock_sts
|
|
@mock_rds
|
|
def test_access_denied_with_temporary_credentials():
|
|
role_name = "test-role"
|
|
session_name = "test-session"
|
|
trust_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
|
|
"Action": "sts:AssumeRole",
|
|
},
|
|
}
|
|
attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{"Effect": "Allow", "Action": ["rds:Describe*"], "Resource": "*"}
|
|
],
|
|
}
|
|
credentials = create_role_with_inline_policy_and_assume_it(
|
|
role_name, trust_policy_document, attached_policy_document, session_name
|
|
)
|
|
client = boto3.client(
|
|
"rds",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=credentials["AccessKeyId"],
|
|
aws_secret_access_key=credentials["SecretAccessKey"],
|
|
aws_session_token=credentials["SessionToken"],
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.create_db_instance(
|
|
DBInstanceIdentifier="test-db-instance",
|
|
DBInstanceClass="db.t3",
|
|
Engine="aurora-postgresql",
|
|
)
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== f"User: arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name} is not authorized to perform: rds:CreateDBInstance"
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_iam
|
|
def test_get_user_from_credentials():
|
|
user_name = "new-test-user"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "iam:*", "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"iam",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
assert client.get_user()["User"]["UserName"] == user_name
|
|
|
|
|
|
@set_initial_no_auth_action_count(0)
|
|
@mock_s3
|
|
def test_s3_invalid_access_key_id():
|
|
client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id="invalid",
|
|
aws_secret_access_key="invalid",
|
|
)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.list_buckets()
|
|
assert ex.value.response["Error"]["Code"] == "InvalidAccessKeyId"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== "The AWS Access Key Id you provided does not exist in our records."
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_s3
|
|
@mock_iam
|
|
def test_s3_signature_does_not_match():
|
|
bucket_name = "test-bucket"
|
|
access_key = create_user_with_access_key()
|
|
client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key="invalid",
|
|
)
|
|
client.create_bucket(Bucket=bucket_name)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.put_object(Bucket=bucket_name, Key="abc")
|
|
assert ex.value.response["Error"]["Code"] == "SignatureDoesNotMatch"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert (
|
|
ex.value.response["Error"]["Message"]
|
|
== "The request signature we calculated does not match the signature you provided. Check your key and signing method."
|
|
)
|
|
|
|
|
|
@set_initial_no_auth_action_count(7)
|
|
@mock_s3
|
|
@mock_iam
|
|
def test_s3_access_denied_not_action():
|
|
user_name = "test-user"
|
|
bucket_name = "test-bucket"
|
|
inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": "*", "Resource": "*"}],
|
|
}
|
|
group_inline_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Deny", "NotAction": "iam:GetUser", "Resource": "*"}],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(
|
|
user_name, inline_policy_document
|
|
)
|
|
create_group_with_inline_policy_and_add_user(
|
|
user_name, group_inline_policy_document
|
|
)
|
|
client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
client.create_bucket(Bucket=bucket_name)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.delete_object(Bucket=bucket_name, Key="sdfsdf")
|
|
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
|
assert ex.value.response["Error"]["Message"] == "Access Denied"
|
|
|
|
|
|
@set_initial_no_auth_action_count(4)
|
|
@mock_iam
|
|
@mock_sts
|
|
@mock_s3
|
|
def test_s3_invalid_token_with_temporary_credentials():
|
|
role_name = "test-role"
|
|
session_name = "test-session"
|
|
bucket_name = "test-bucket-888"
|
|
trust_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": {
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
|
|
"Action": "sts:AssumeRole",
|
|
},
|
|
}
|
|
attached_policy_document = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": "*"}],
|
|
}
|
|
credentials = create_role_with_inline_policy_and_assume_it(
|
|
role_name, trust_policy_document, attached_policy_document, session_name
|
|
)
|
|
client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=credentials["AccessKeyId"],
|
|
aws_secret_access_key=credentials["SecretAccessKey"],
|
|
aws_session_token="invalid",
|
|
)
|
|
client.create_bucket(Bucket=bucket_name)
|
|
with pytest.raises(ClientError) as ex:
|
|
client.list_bucket_metrics_configurations(Bucket=bucket_name)
|
|
err = ex.value.response["Error"]
|
|
assert err["Code"] == "InvalidToken"
|
|
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
|
|
assert err["Message"] == "The provided token is malformed or otherwise invalid."
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_s3
|
|
@mock_iam
|
|
def test_allow_bucket_access_using_resource_arn() -> None:
|
|
user_name = "test-user"
|
|
policy_doc = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Action": ["s3:*"],
|
|
"Effect": "Allow",
|
|
"Resource": "arn:aws:s3:::my_bucket",
|
|
"Sid": "BucketLevelGrants",
|
|
},
|
|
],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(user_name, policy_doc)
|
|
|
|
s3_client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
|
|
s3_client.create_bucket(Bucket="my_bucket")
|
|
with pytest.raises(ClientError):
|
|
s3_client.create_bucket(Bucket="my_bucket2")
|
|
|
|
s3_client.head_bucket(Bucket="my_bucket")
|
|
with pytest.raises(ClientError):
|
|
s3_client.head_bucket(Bucket="my_bucket2")
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_s3
|
|
@mock_iam
|
|
def test_allow_key_access_using_resource_arn() -> None:
|
|
user_name = "test-user"
|
|
policy_doc = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Action": ["s3:*"],
|
|
"Effect": "Allow",
|
|
"Resource": ["arn:aws:s3:::my_bucket", "arn:aws:s3:::*/keyname"],
|
|
"Sid": "KeyLevelGrants",
|
|
},
|
|
],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(user_name, policy_doc)
|
|
|
|
s3_client = boto3.client(
|
|
"s3",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
|
|
s3_client.create_bucket(Bucket="my_bucket")
|
|
s3_client.put_object(Bucket="my_bucket", Key="keyname", Body=b"test")
|
|
with pytest.raises(ClientError):
|
|
s3_client.put_object(Bucket="my_bucket", Key="unknown", Body=b"test")
|
|
|
|
|
|
@set_initial_no_auth_action_count(3)
|
|
@mock_ssm
|
|
@mock_iam
|
|
def test_ssm_service():
|
|
user_name = "test-user"
|
|
policy_doc = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Action": ["ssm:*"],
|
|
"Effect": "Allow",
|
|
"Resource": ["*"],
|
|
},
|
|
],
|
|
}
|
|
access_key = create_user_with_access_key_and_inline_policy(user_name, policy_doc)
|
|
|
|
ssmc = boto3.client(
|
|
"ssm",
|
|
region_name="us-east-1",
|
|
aws_access_key_id=access_key["AccessKeyId"],
|
|
aws_secret_access_key=access_key["SecretAccessKey"],
|
|
)
|
|
|
|
ssmc.put_parameter(Name="test", Value="value", Type="String")
|