moto/tests/test_core/test_auth.py

759 lines
26 KiB
Python

import json
import boto3
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
import pytest
from moto import mock_iam, mock_ec2, mock_s3, mock_sts, mock_elbv2, mock_rds
from moto.core import set_initial_no_auth_action_count
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from uuid import uuid4
@mock_iam
def create_user_with_access_key(user_name="test-user"):
client = boto3.client("iam", region_name="us-east-1")
client.create_user(UserName=user_name)
return client.create_access_key(UserName=user_name)["AccessKey"]
@mock_iam
def create_user_with_access_key_and_inline_policy(
user_name, policy_document, policy_name="policy1"
):
client = boto3.client("iam", region_name="us-east-1")
client.create_user(UserName=user_name)
client.put_user_policy(
UserName=user_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document),
)
return client.create_access_key(UserName=user_name)["AccessKey"]
@mock_iam
def create_user_with_access_key_and_attached_policy(
user_name, policy_document, policy_name="policy1"
):
client = boto3.client("iam", region_name="us-east-1")
client.create_user(UserName=user_name)
policy_arn = client.create_policy(
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
)["Policy"]["Arn"]
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
return client.create_access_key(UserName=user_name)["AccessKey"]
@mock_iam
def create_user_with_access_key_and_multiple_policies(
user_name,
inline_policy_document,
attached_policy_document,
inline_policy_name="policy1",
attached_policy_name="policy1",
):
client = boto3.client("iam", region_name="us-east-1")
client.create_user(UserName=user_name)
policy_arn = client.create_policy(
PolicyName=attached_policy_name,
PolicyDocument=json.dumps(attached_policy_document),
)["Policy"]["Arn"]
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
client.put_user_policy(
UserName=user_name,
PolicyName=inline_policy_name,
PolicyDocument=json.dumps(inline_policy_document),
)
return client.create_access_key(UserName=user_name)["AccessKey"]
def create_group_with_attached_policy_and_add_user(
user_name, policy_document, group_name="test-group", policy_name=None
):
if not policy_name:
policy_name = str(uuid4())
client = boto3.client("iam", region_name="us-east-1")
client.create_group(GroupName=group_name)
policy_arn = client.create_policy(
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
)["Policy"]["Arn"]
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
def create_group_with_inline_policy_and_add_user(
user_name, policy_document, group_name="test-group", policy_name="policy1"
):
client = boto3.client("iam", region_name="us-east-1")
client.create_group(GroupName=group_name)
client.put_group_policy(
GroupName=group_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document),
)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
def create_group_with_multiple_policies_and_add_user(
user_name,
inline_policy_document,
attached_policy_document,
group_name="test-group",
inline_policy_name="policy1",
attached_policy_name=None,
):
if not attached_policy_name:
attached_policy_name = str(uuid4())
client = boto3.client("iam", region_name="us-east-1")
client.create_group(GroupName=group_name)
client.put_group_policy(
GroupName=group_name,
PolicyName=inline_policy_name,
PolicyDocument=json.dumps(inline_policy_document),
)
policy_arn = client.create_policy(
PolicyName=attached_policy_name,
PolicyDocument=json.dumps(attached_policy_document),
)["Policy"]["Arn"]
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
@mock_iam
@mock_sts
def create_role_with_attached_policy_and_assume_it(
role_name,
trust_policy_document,
policy_document,
session_name="session1",
policy_name="policy1",
):
iam_client = boto3.client("iam", region_name="us-east-1")
sts_client = boto3.client("sts", region_name="us-east-1")
role_arn = iam_client.create_role(
RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)["Role"]["Arn"]
policy_arn = iam_client.create_policy(
PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
)["Policy"]["Arn"]
iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)[
"Credentials"
]
@mock_iam
@mock_sts
def create_role_with_inline_policy_and_assume_it(
role_name,
trust_policy_document,
policy_document,
session_name="session1",
policy_name="policy1",
):
iam_client = boto3.client("iam", region_name="us-east-1")
sts_client = boto3.client("sts", region_name="us-east-1")
role_arn = iam_client.create_role(
RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)["Role"]["Arn"]
iam_client.put_role_policy(
RoleName=role_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document),
)
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)[
"Credentials"
]
@set_initial_no_auth_action_count(0)
@mock_iam
def test_invalid_client_token_id():
client = boto3.client(
"iam",
region_name="us-east-1",
aws_access_key_id="invalid",
aws_secret_access_key="invalid",
)
with pytest.raises(ClientError) as ex:
client.get_user()
ex.value.response["Error"]["Code"].should.equal("InvalidClientTokenId")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
"The security token included in the request is invalid."
)
@set_initial_no_auth_action_count(0)
@mock_ec2
def test_auth_failure():
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id="invalid",
aws_secret_access_key="invalid",
)
with pytest.raises(ClientError) as ex:
client.describe_instances()
ex.value.response["Error"]["Code"].should.equal("AuthFailure")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(401)
ex.value.response["Error"]["Message"].should.equal(
"AWS was not able to validate the provided access credentials"
)
@set_initial_no_auth_action_count(2)
@mock_iam
def test_signature_does_not_match():
access_key = create_user_with_access_key()
client = boto3.client(
"iam",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key="invalid",
)
with pytest.raises(ClientError) as ex:
client.get_user()
ex.value.response["Error"]["Code"].should.equal("SignatureDoesNotMatch")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."
)
@set_initial_no_auth_action_count(2)
@mock_ec2
def test_auth_failure_with_valid_access_key_id():
access_key = create_user_with_access_key()
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key="invalid",
)
with pytest.raises(ClientError) as ex:
client.describe_instances()
ex.value.response["Error"]["Code"].should.equal("AuthFailure")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(401)
ex.value.response["Error"]["Message"].should.equal(
"AWS was not able to validate the provided access credentials"
)
@set_initial_no_auth_action_count(2)
@mock_ec2
def test_access_denied_with_no_policy():
user_name = "test-user"
access_key = create_user_with_access_key(user_name)
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
with pytest.raises(ClientError) as ex:
client.describe_instances()
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_access_denied_with_not_allowing_policy():
user_name = "test-user"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": ["ec2:Run*"], "Resource": "*"}],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
with pytest.raises(ClientError) as ex:
client.describe_instances()
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_access_denied_for_run_instances():
# https://github.com/spulec/moto/issues/2774
# The run-instances method was broken between botocore versions 1.15.8 and 1.15.12
# This was due to the inclusion of '"idempotencyToken":true' in the response, somehow altering the signature and breaking the authentication
# Keeping this test in place in case botocore decides to break again
user_name = "test-user"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["ec2:Describe*"], "Resource": "*"}
],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
with pytest.raises(ClientError) as ex:
client.run_instances(MaxCount=1, MinCount=1)
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:RunInstances"
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_access_denied_with_denying_policy():
user_name = "test-user"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["ec2:*"], "Resource": "*"},
{"Effect": "Deny", "Action": "ec2:CreateVpc", "Resource": "*"},
],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
with pytest.raises(ClientError) as ex:
client.create_vpc(CidrBlock="10.0.0.0/16")
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateVpc"
)
@set_initial_no_auth_action_count(3)
@mock_sts
def test_get_caller_identity_allowed_with_denying_policy():
user_name = "test-user"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Deny", "Action": "sts:GetCallerIdentity", "Resource": "*"}
],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
client = boto3.client(
"sts",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
client.get_caller_identity().should.be.a(dict)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_allowed_with_wildcard_action():
user_name = "test-user"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": "ec2:Describe*", "Resource": "*"}],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
client.describe_tags()["Tags"].should.be.empty
@set_initial_no_auth_action_count(4)
@mock_iam
def test_allowed_with_explicit_action_in_attached_policy():
user_name = "test-user"
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": "iam:ListGroups", "Resource": "*"}],
}
access_key = create_user_with_access_key_and_attached_policy(
user_name, attached_policy_document
)
client = boto3.client(
"iam",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
client.list_groups()["Groups"].should.be.empty
@set_initial_no_auth_action_count(8)
@mock_s3
@mock_iam
def test_s3_access_denied_with_denying_attached_group_policy():
user_name = "test-user"
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": "s3:ListAllMyBuckets", "Resource": "*"}
],
}
group_attached_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "s3:List*", "Resource": "*"}],
}
access_key = create_user_with_access_key_and_attached_policy(
user_name, attached_policy_document, policy_name="policy1"
)
create_group_with_attached_policy_and_add_user(
user_name, group_attached_policy_document, policy_name="policy2"
)
client = boto3.client(
"s3",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
with pytest.raises(ClientError) as ex:
client.list_buckets()
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal("Access Denied")
@set_initial_no_auth_action_count(6)
@mock_s3
@mock_iam
def test_s3_access_denied_with_denying_inline_group_policy():
user_name = "test-user"
bucket_name = "test-bucket"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": "*", "Resource": "*"}],
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "s3:GetObject", "Resource": "*"}],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
create_group_with_inline_policy_and_add_user(
user_name, group_inline_policy_document
)
client = boto3.client(
"s3",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
client.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as ex:
client.get_object(Bucket=bucket_name, Key="sdfsdf")
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal("Access Denied")
@set_initial_no_auth_action_count(10)
@mock_iam
@mock_ec2
def test_access_denied_with_many_irrelevant_policies():
user_name = "test-user"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": "ec2:Describe*", "Resource": "*"}],
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}],
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "iam:List*", "Resource": "*"}],
}
group_attached_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "lambda:*", "Resource": "*"}],
}
access_key = create_user_with_access_key_and_multiple_policies(
user_name,
inline_policy_document,
attached_policy_document,
attached_policy_name="policy1",
)
create_group_with_multiple_policies_and_add_user(
user_name,
group_inline_policy_document,
group_attached_policy_document,
attached_policy_name="policy2",
)
client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
with pytest.raises(ClientError) as ex:
client.create_key_pair(KeyName="TestKey")
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateKeyPair"
)
@set_initial_no_auth_action_count(4)
@mock_iam
@mock_sts
@mock_ec2
@mock_elbv2
def test_allowed_with_temporary_credentials():
role_name = "test-role"
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": "sts:AssumeRole",
},
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticloadbalancing:CreateLoadBalancer",
"ec2:DescribeSubnets",
],
"Resource": "*",
}
],
}
credentials = create_role_with_attached_policy_and_assume_it(
role_name, trust_policy_document, attached_policy_document
)
elbv2_client = boto3.client(
"elbv2",
region_name="us-east-1",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)
ec2_client = boto3.client(
"ec2",
region_name="us-east-1",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)
subnets = ec2_client.describe_subnets()["Subnets"]
len(subnets).should.be.greater_than(1)
elbv2_client.create_load_balancer(
Name="test-load-balancer",
Subnets=[subnets[0]["SubnetId"], subnets[1]["SubnetId"]],
)["LoadBalancers"].should.have.length_of(1)
@set_initial_no_auth_action_count(3)
@mock_iam
@mock_sts
@mock_rds
def test_access_denied_with_temporary_credentials():
role_name = "test-role"
session_name = "test-session"
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": "sts:AssumeRole",
},
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["rds:Describe*"], "Resource": "*"}
],
}
credentials = create_role_with_inline_policy_and_assume_it(
role_name, trust_policy_document, attached_policy_document, session_name
)
client = boto3.client(
"rds",
region_name="us-east-1",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)
with pytest.raises(ClientError) as ex:
client.create_db_instance(
DBInstanceIdentifier="test-db-instance",
DBInstanceClass="db.t3",
Engine="aurora-postgresql",
)
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
f"User: arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name} is not authorized to perform: rds:CreateDBInstance"
)
@set_initial_no_auth_action_count(3)
@mock_iam
def test_get_user_from_credentials():
user_name = "new-test-user"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": "iam:*", "Resource": "*"}],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
client = boto3.client(
"iam",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
client.get_user()["User"]["UserName"].should.equal(user_name)
@set_initial_no_auth_action_count(0)
@mock_s3
def test_s3_invalid_access_key_id():
client = boto3.client(
"s3",
region_name="us-east-1",
aws_access_key_id="invalid",
aws_secret_access_key="invalid",
)
with pytest.raises(ClientError) as ex:
client.list_buckets()
ex.value.response["Error"]["Code"].should.equal("InvalidAccessKeyId")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
"The AWS Access Key Id you provided does not exist in our records."
)
@set_initial_no_auth_action_count(3)
@mock_s3
@mock_iam
def test_s3_signature_does_not_match():
bucket_name = "test-bucket"
access_key = create_user_with_access_key()
client = boto3.client(
"s3",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key="invalid",
)
client.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as ex:
client.put_object(Bucket=bucket_name, Key="abc")
ex.value.response["Error"]["Code"].should.equal("SignatureDoesNotMatch")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal(
"The request signature we calculated does not match the signature you provided. Check your key and signing method."
)
@set_initial_no_auth_action_count(7)
@mock_s3
@mock_iam
def test_s3_access_denied_not_action():
user_name = "test-user"
bucket_name = "test-bucket"
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": "*", "Resource": "*"}],
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "NotAction": "iam:GetUser", "Resource": "*"}],
}
access_key = create_user_with_access_key_and_inline_policy(
user_name, inline_policy_document
)
create_group_with_inline_policy_and_add_user(
user_name, group_inline_policy_document
)
client = boto3.client(
"s3",
region_name="us-east-1",
aws_access_key_id=access_key["AccessKeyId"],
aws_secret_access_key=access_key["SecretAccessKey"],
)
client.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as ex:
client.delete_object(Bucket=bucket_name, Key="sdfsdf")
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
ex.value.response["Error"]["Message"].should.equal("Access Denied")
@set_initial_no_auth_action_count(4)
@mock_iam
@mock_sts
@mock_s3
def test_s3_invalid_token_with_temporary_credentials():
role_name = "test-role"
session_name = "test-session"
bucket_name = "test-bucket-888"
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": "sts:AssumeRole",
},
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": "*"}],
}
credentials = create_role_with_inline_policy_and_assume_it(
role_name, trust_policy_document, attached_policy_document, session_name
)
client = boto3.client(
"s3",
region_name="us-east-1",
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token="invalid",
)
client.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as ex:
client.list_bucket_metrics_configurations(Bucket=bucket_name)
ex.value.response["Error"]["Code"].should.equal("InvalidToken")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.value.response["Error"]["Message"].should.equal(
"The provided token is malformed or otherwise invalid."
)