2014-08-27 15:17:06 +00:00
|
|
|
from __future__ import unicode_literals
|
2013-05-24 21:22:34 +00:00
|
|
|
import json
|
|
|
|
|
|
|
|
import boto
|
2017-01-19 03:59:04 +00:00
|
|
|
import boto3
|
2019-07-20 06:25:46 +00:00
|
|
|
from botocore.client import ClientError
|
2013-05-24 21:22:34 +00:00
|
|
|
from freezegun import freeze_time
|
2019-07-20 06:25:46 +00:00
|
|
|
from nose.tools import assert_raises
|
2013-08-03 21:21:25 +00:00
|
|
|
import sure # noqa
|
2013-05-24 21:22:34 +00:00
|
|
|
|
2019-08-21 10:36:40 +00:00
|
|
|
|
2019-08-22 09:06:42 +00:00
|
|
|
from moto import mock_sts, mock_sts_deprecated, mock_iam, settings
|
2019-08-21 10:20:35 +00:00
|
|
|
from moto.iam.models import ACCOUNT_ID
|
2019-07-20 06:25:46 +00:00
|
|
|
from moto.sts.responses import MAX_FEDERATION_TOKEN_POLICY_LENGTH
|
2013-05-24 21:22:34 +00:00
|
|
|
|
|
|
|
|
|
|
|
@freeze_time("2012-01-01 12:00:00")
|
2017-02-16 03:35:45 +00:00
|
|
|
@mock_sts_deprecated
|
2013-05-24 21:22:34 +00:00
|
|
|
def test_get_session_token():
|
|
|
|
conn = boto.connect_sts()
|
|
|
|
token = conn.get_session_token(duration=123)
|
|
|
|
|
2014-11-30 04:34:40 +00:00
|
|
|
token.expiration.should.equal('2012-01-01T12:02:03.000Z')
|
2017-02-24 02:37:43 +00:00
|
|
|
token.session_token.should.equal(
|
|
|
|
"AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE")
|
2013-05-24 21:22:34 +00:00
|
|
|
token.access_key.should.equal("AKIAIOSFODNN7EXAMPLE")
|
|
|
|
token.secret_key.should.equal("wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY")
|
|
|
|
|
|
|
|
|
2014-03-20 16:46:03 +00:00
|
|
|
@freeze_time("2012-01-01 12:00:00")
|
2017-02-16 03:35:45 +00:00
|
|
|
@mock_sts_deprecated
|
2014-03-20 16:46:03 +00:00
|
|
|
def test_get_federation_token():
|
|
|
|
conn = boto.connect_sts()
|
2019-08-21 10:20:35 +00:00
|
|
|
token_name = "Bob"
|
|
|
|
token = conn.get_federation_token(duration=123, name=token_name)
|
2014-03-20 16:46:03 +00:00
|
|
|
|
2014-11-30 04:34:40 +00:00
|
|
|
token.credentials.expiration.should.equal('2012-01-01T12:02:03.000Z')
|
2017-02-24 02:37:43 +00:00
|
|
|
token.credentials.session_token.should.equal(
|
|
|
|
"AQoDYXdzEPT//////////wEXAMPLEtc764bNrC9SAPBSM22wDOk4x4HIZ8j4FZTwdQWLWsKWHGBuFqwAeMicRXmxfpSPfIeoIYRqTflfKD8YUuwthAx7mSEI/qkPpKPi/kMcGdQrmGdeehM4IC1NtBmUpp2wUE8phUZampKsburEDy0KPkyQDYwT7WZ0wq5VSXDvp75YU9HFvlRd8Tx6q6fE8YQcHNVXAkiY9q6d+xo0rKwT38xVqr7ZD0u0iPPkUL64lIZbqBAz+scqKmlzm8FDrypNC9Yjc8fPOLn9FX9KSYvKTr4rvx3iSIlTJabIQwj2ICCR/oLxBA==")
|
2014-03-20 16:46:03 +00:00
|
|
|
token.credentials.access_key.should.equal("AKIAIOSFODNN7EXAMPLE")
|
2017-02-24 02:37:43 +00:00
|
|
|
token.credentials.secret_key.should.equal(
|
|
|
|
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY")
|
|
|
|
token.federated_user_arn.should.equal(
|
2019-08-21 10:20:35 +00:00
|
|
|
"arn:aws:sts::{account_id}:federated-user/{token_name}".format(account_id=ACCOUNT_ID, token_name=token_name))
|
|
|
|
token.federated_user_id.should.equal(str(ACCOUNT_ID) + ":" + token_name)
|
2014-03-20 16:46:03 +00:00
|
|
|
|
|
|
|
|
2013-05-24 21:22:34 +00:00
|
|
|
@freeze_time("2012-01-01 12:00:00")
|
2019-08-21 08:45:36 +00:00
|
|
|
@mock_sts
|
2013-05-24 21:22:34 +00:00
|
|
|
def test_assume_role():
|
2019-08-21 08:45:36 +00:00
|
|
|
client = boto3.client(
|
|
|
|
"sts", region_name='us-east-1')
|
2013-05-24 21:22:34 +00:00
|
|
|
|
2019-08-21 08:45:36 +00:00
|
|
|
session_name = "session-name"
|
2013-05-24 21:22:34 +00:00
|
|
|
policy = json.dumps({
|
|
|
|
"Statement": [
|
|
|
|
{
|
|
|
|
"Sid": "Stmt13690092345534",
|
|
|
|
"Action": [
|
|
|
|
"S3:ListBucket"
|
|
|
|
],
|
|
|
|
"Effect": "Allow",
|
|
|
|
"Resource": [
|
|
|
|
"arn:aws:s3:::foobar-tester"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
]
|
|
|
|
})
|
2019-08-21 17:47:12 +00:00
|
|
|
role_name = "test-role"
|
|
|
|
s3_role = "arn:aws:iam::{account_id}:role/{role_name}".format(account_id=ACCOUNT_ID, role_name=role_name)
|
2019-08-21 08:45:36 +00:00
|
|
|
assume_role_response = client.assume_role(RoleArn=s3_role, RoleSessionName=session_name,
|
|
|
|
Policy=policy, DurationSeconds=900)
|
|
|
|
|
|
|
|
credentials = assume_role_response['Credentials']
|
2019-08-22 09:06:42 +00:00
|
|
|
if not settings.TEST_SERVER_MODE:
|
|
|
|
credentials['Expiration'].isoformat().should.equal('2012-01-01T12:15:00+00:00')
|
2019-08-21 08:45:36 +00:00
|
|
|
credentials['SessionToken'].should.have.length_of(356)
|
|
|
|
assert credentials['SessionToken'].startswith("FQoGZXIvYXdzE")
|
|
|
|
credentials['AccessKeyId'].should.have.length_of(20)
|
|
|
|
assert credentials['AccessKeyId'].startswith("ASIA")
|
|
|
|
credentials['SecretAccessKey'].should.have.length_of(40)
|
|
|
|
|
2019-08-21 17:47:12 +00:00
|
|
|
assume_role_response['AssumedRoleUser']['Arn'].should.equal("arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name}".format(
|
|
|
|
account_id=ACCOUNT_ID, role_name=role_name, session_name=session_name))
|
2019-08-21 08:45:36 +00:00
|
|
|
assert assume_role_response['AssumedRoleUser']['AssumedRoleId'].startswith("AROA")
|
|
|
|
assert assume_role_response['AssumedRoleUser']['AssumedRoleId'].endswith(":" + session_name)
|
|
|
|
assume_role_response['AssumedRoleUser']['AssumedRoleId'].should.have.length_of(21 + 1 + len(session_name))
|
2017-01-19 03:59:04 +00:00
|
|
|
|
2017-02-24 02:37:43 +00:00
|
|
|
|
2019-07-16 03:27:47 +00:00
|
|
|
@freeze_time("2012-01-01 12:00:00")
|
|
|
|
@mock_sts_deprecated
|
|
|
|
def test_assume_role_with_web_identity():
|
|
|
|
conn = boto.connect_sts()
|
|
|
|
|
|
|
|
policy = json.dumps({
|
|
|
|
"Statement": [
|
|
|
|
{
|
|
|
|
"Sid": "Stmt13690092345534",
|
|
|
|
"Action": [
|
|
|
|
"S3:ListBucket"
|
|
|
|
],
|
|
|
|
"Effect": "Allow",
|
|
|
|
"Resource": [
|
|
|
|
"arn:aws:s3:::foobar-tester"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
]
|
|
|
|
})
|
2019-08-21 17:47:12 +00:00
|
|
|
role_name = "test-role"
|
|
|
|
s3_role = "arn:aws:iam::{account_id}:role/{role_name}".format(account_id=ACCOUNT_ID, role_name=role_name)
|
2019-08-21 11:17:58 +00:00
|
|
|
session_name = "session-name"
|
2019-07-16 03:27:47 +00:00
|
|
|
role = conn.assume_role_with_web_identity(
|
2019-08-21 11:17:58 +00:00
|
|
|
s3_role, session_name, policy, duration_seconds=123)
|
2019-07-16 03:27:47 +00:00
|
|
|
|
|
|
|
credentials = role.credentials
|
|
|
|
credentials.expiration.should.equal('2012-01-01T12:02:03.000Z')
|
|
|
|
credentials.session_token.should.have.length_of(356)
|
|
|
|
assert credentials.session_token.startswith("FQoGZXIvYXdzE")
|
|
|
|
credentials.access_key.should.have.length_of(20)
|
|
|
|
assert credentials.access_key.startswith("ASIA")
|
|
|
|
credentials.secret_key.should.have.length_of(40)
|
|
|
|
|
2019-08-21 17:47:12 +00:00
|
|
|
role.user.arn.should.equal("arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name}".format(
|
|
|
|
account_id=ACCOUNT_ID, role_name=role_name, session_name=session_name))
|
2019-07-16 03:27:47 +00:00
|
|
|
role.user.assume_role_id.should.contain("session-name")
|
|
|
|
|
|
|
|
|
2017-01-19 03:59:04 +00:00
|
|
|
@mock_sts
|
2019-08-21 10:20:35 +00:00
|
|
|
def test_get_caller_identity_with_default_credentials():
|
2017-02-24 02:37:43 +00:00
|
|
|
identity = boto3.client(
|
|
|
|
"sts", region_name='us-east-1').get_caller_identity()
|
2017-01-19 03:59:04 +00:00
|
|
|
|
2019-08-21 10:20:35 +00:00
|
|
|
identity['Arn'].should.equal('arn:aws:sts::{account_id}:user/moto'.format(account_id=ACCOUNT_ID))
|
2017-01-19 03:59:04 +00:00
|
|
|
identity['UserId'].should.equal('AKIAIOSFODNN7EXAMPLE')
|
2019-08-21 10:20:35 +00:00
|
|
|
identity['Account'].should.equal(str(ACCOUNT_ID))
|
|
|
|
|
|
|
|
|
|
|
|
@mock_sts
|
|
|
|
@mock_iam
|
|
|
|
def test_get_caller_identity_with_iam_user_credentials():
|
|
|
|
iam_client = boto3.client("iam", region_name='us-east-1')
|
|
|
|
iam_user_name = "new-user"
|
|
|
|
iam_user = iam_client.create_user(UserName=iam_user_name)['User']
|
|
|
|
access_key = iam_client.create_access_key(UserName=iam_user_name)['AccessKey']
|
|
|
|
|
|
|
|
identity = boto3.client(
|
|
|
|
"sts", region_name='us-east-1', aws_access_key_id=access_key['AccessKeyId'],
|
|
|
|
aws_secret_access_key=access_key['SecretAccessKey']).get_caller_identity()
|
|
|
|
|
|
|
|
identity['Arn'].should.equal(iam_user['Arn'])
|
|
|
|
identity['UserId'].should.equal(iam_user['UserId'])
|
|
|
|
identity['Account'].should.equal(str(ACCOUNT_ID))
|
|
|
|
|
|
|
|
|
|
|
|
@mock_sts
|
|
|
|
@mock_iam
|
|
|
|
def test_get_caller_identity_with_assumed_role_credentials():
|
|
|
|
iam_client = boto3.client("iam", region_name='us-east-1')
|
|
|
|
sts_client = boto3.client("sts", region_name='us-east-1')
|
|
|
|
iam_role_name = "new-user"
|
|
|
|
trust_policy_document = {
|
|
|
|
"Version": "2012-10-17",
|
|
|
|
"Statement": {
|
|
|
|
"Effect": "Allow",
|
|
|
|
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
|
|
|
|
"Action": "sts:AssumeRole"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
iam_role_arn = iam_client.role_arn = iam_client.create_role(
|
|
|
|
RoleName=iam_role_name,
|
|
|
|
AssumeRolePolicyDocument=json.dumps(trust_policy_document)
|
|
|
|
)['Role']['Arn']
|
|
|
|
session_name = "new-session"
|
|
|
|
assumed_role = sts_client.assume_role(RoleArn=iam_role_arn,
|
|
|
|
RoleSessionName=session_name)
|
|
|
|
access_key = assumed_role['Credentials']
|
|
|
|
|
|
|
|
identity = boto3.client(
|
|
|
|
"sts", region_name='us-east-1', aws_access_key_id=access_key['AccessKeyId'],
|
|
|
|
aws_secret_access_key=access_key['SecretAccessKey']).get_caller_identity()
|
|
|
|
|
|
|
|
identity['Arn'].should.equal(assumed_role['AssumedRoleUser']['Arn'])
|
|
|
|
identity['UserId'].should.equal(assumed_role['AssumedRoleUser']['AssumedRoleId'])
|
|
|
|
identity['Account'].should.equal(str(ACCOUNT_ID))
|
2019-07-20 06:25:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
@mock_sts
|
|
|
|
def test_federation_token_with_too_long_policy():
|
|
|
|
"Trying to get a federation token with a policy longer than 2048 character should fail"
|
|
|
|
cli = boto3.client("sts", region_name='us-east-1')
|
|
|
|
resource_tmpl = 'arn:aws:s3:::yyyy-xxxxx-cloud-default/my_default_folder/folder-name-%s/*'
|
|
|
|
statements = []
|
|
|
|
for num in range(30):
|
|
|
|
statements.append(
|
|
|
|
{
|
|
|
|
'Effect': 'Allow',
|
|
|
|
'Action': ['s3:*'],
|
|
|
|
'Resource': resource_tmpl % str(num)
|
|
|
|
}
|
|
|
|
)
|
|
|
|
policy = {
|
|
|
|
'Version': '2012-10-17',
|
|
|
|
'Statement': statements
|
|
|
|
}
|
|
|
|
json_policy = json.dumps(policy)
|
|
|
|
assert len(json_policy) > MAX_FEDERATION_TOKEN_POLICY_LENGTH
|
|
|
|
|
|
|
|
with assert_raises(ClientError) as exc:
|
|
|
|
cli.get_federation_token(Name='foo', DurationSeconds=3600, Policy=json_policy)
|
|
|
|
exc.exception.response['Error']['Code'].should.equal('ValidationError')
|
|
|
|
exc.exception.response['Error']['Message'].should.contain(
|
|
|
|
str(MAX_FEDERATION_TOKEN_POLICY_LENGTH)
|
|
|
|
)
|