Implemented get-caller-identity returning real data depending on the access key used.

This commit is contained in:
acsbendi 2019-08-21 12:20:35 +02:00
parent d9cb1f2d35
commit d931204266
3 changed files with 96 additions and 17 deletions

View File

@ -22,7 +22,7 @@ class AssumedRole(BaseModel):
def __init__(self, role_session_name, role_arn, policy, duration, external_id): def __init__(self, role_session_name, role_arn, policy, duration, external_id):
self.session_name = role_session_name self.session_name = role_session_name
self.arn = role_arn self.arn = role_arn + "/" + role_session_name
self.policy = policy self.policy = policy
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
self.expiration = now + datetime.timedelta(seconds=duration) self.expiration = now + datetime.timedelta(seconds=duration)
@ -36,6 +36,10 @@ class AssumedRole(BaseModel):
def expiration_ISO8601(self): def expiration_ISO8601(self):
return iso_8601_datetime_with_milliseconds(self.expiration) return iso_8601_datetime_with_milliseconds(self.expiration)
@property
def user_id(self):
return self.assumed_role_id + ":" + self.session_name
class STSBackend(BaseBackend): class STSBackend(BaseBackend):
@ -55,5 +59,11 @@ class STSBackend(BaseBackend):
self.assumed_roles.append(role) self.assumed_roles.append(role)
return role return role
def get_assumed_role_from_access_key(self, access_key_id):
for assumed_role in self.assumed_roles:
if assumed_role.access_key_id == access_key_id:
return assumed_role
return None
sts_backend = STSBackend() sts_backend = STSBackend()

View File

@ -1,6 +1,8 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from moto.iam.models import ACCOUNT_ID
from moto.iam import iam_backend
from .models import sts_backend from .models import sts_backend
@ -19,7 +21,7 @@ class TokenResponse(BaseResponse):
token = sts_backend.get_federation_token( token = sts_backend.get_federation_token(
duration=duration, name=name, policy=policy) duration=duration, name=name, policy=policy)
template = self.response_template(GET_FEDERATION_TOKEN_RESPONSE) template = self.response_template(GET_FEDERATION_TOKEN_RESPONSE)
return template.render(token=token) return template.render(token=token, account_id=ACCOUNT_ID)
def assume_role(self): def assume_role(self):
role_session_name = self.querystring.get('RoleSessionName')[0] role_session_name = self.querystring.get('RoleSessionName')[0]
@ -41,7 +43,23 @@ class TokenResponse(BaseResponse):
def get_caller_identity(self): def get_caller_identity(self):
template = self.response_template(GET_CALLER_IDENTITY_RESPONSE) template = self.response_template(GET_CALLER_IDENTITY_RESPONSE)
return template.render()
# Default values in case the request does not use valid credentials generated by moto
user_id = "AKIAIOSFODNN7EXAMPLE"
arn = "arn:aws:sts::{account_id}:user/moto".format(account_id=ACCOUNT_ID)
access_key_id = self.get_current_user()
assumed_role = sts_backend.get_assumed_role_from_access_key(access_key_id)
if assumed_role:
user_id = assumed_role.user_id
arn = assumed_role.arn
user = iam_backend.get_user_from_access_key_id(access_key_id)
if user:
user_id = user.id
arn = user.arn
return template.render(account_id=ACCOUNT_ID, user_id=user_id, arn=arn)
GET_SESSION_TOKEN_RESPONSE = """<GetSessionTokenResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/"> GET_SESSION_TOKEN_RESPONSE = """<GetSessionTokenResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/">
@ -69,8 +87,8 @@ GET_FEDERATION_TOKEN_RESPONSE = """<GetFederationTokenResponse xmlns="https://st
<AccessKeyId>AKIAIOSFODNN7EXAMPLE</AccessKeyId> <AccessKeyId>AKIAIOSFODNN7EXAMPLE</AccessKeyId>
</Credentials> </Credentials>
<FederatedUser> <FederatedUser>
<Arn>arn:aws:sts::123456789012:federated-user/{{ token.name }}</Arn> <Arn>arn:aws:sts::{{ account_id }}:federated-user/{{ token.name }}</Arn>
<FederatedUserId>123456789012:{{ token.name }}</FederatedUserId> <FederatedUserId>{{ account_id }}:{{ token.name }}</FederatedUserId>
</FederatedUser> </FederatedUser>
<PackedPolicySize>6</PackedPolicySize> <PackedPolicySize>6</PackedPolicySize>
</GetFederationTokenResult> </GetFederationTokenResult>
@ -91,7 +109,7 @@ ASSUME_ROLE_RESPONSE = """<AssumeRoleResponse xmlns="https://sts.amazonaws.com/d
</Credentials> </Credentials>
<AssumedRoleUser> <AssumedRoleUser>
<Arn>{{ role.arn }}</Arn> <Arn>{{ role.arn }}</Arn>
<AssumedRoleId>{{ role.assumed_role_id }}:{{ role.session_name }}</AssumedRoleId> <AssumedRoleId>{{ role.user_id }}</AssumedRoleId>
</AssumedRoleUser> </AssumedRoleUser>
<PackedPolicySize>6</PackedPolicySize> <PackedPolicySize>6</PackedPolicySize>
</AssumeRoleResult> </AssumeRoleResult>
@ -102,9 +120,9 @@ ASSUME_ROLE_RESPONSE = """<AssumeRoleResponse xmlns="https://sts.amazonaws.com/d
GET_CALLER_IDENTITY_RESPONSE = """<GetCallerIdentityResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/"> GET_CALLER_IDENTITY_RESPONSE = """<GetCallerIdentityResponse xmlns="https://sts.amazonaws.com/doc/2011-06-15/">
<GetCallerIdentityResult> <GetCallerIdentityResult>
<Arn>arn:aws:sts::123456789012:user/moto</Arn> <Arn>{{ arn }}</Arn>
<UserId>AKIAIOSFODNN7EXAMPLE</UserId> <UserId>{{ user_id }}</UserId>
<Account>123456789012</Account> <Account>{{ account_id }}</Account>
</GetCallerIdentityResult> </GetCallerIdentityResult>
<ResponseMetadata> <ResponseMetadata>
<RequestId>c6104cbe-af31-11e0-8154-cbc7ccf896c7</RequestId> <RequestId>c6104cbe-af31-11e0-8154-cbc7ccf896c7</RequestId>

View File

@ -6,7 +6,8 @@ import boto3
from freezegun import freeze_time from freezegun import freeze_time
import sure # noqa import sure # noqa
from moto import mock_sts, mock_sts_deprecated from moto import mock_sts, mock_sts_deprecated, mock_iam
from moto.iam.models import ACCOUNT_ID
@freeze_time("2012-01-01 12:00:00") @freeze_time("2012-01-01 12:00:00")
@ -26,7 +27,8 @@ def test_get_session_token():
@mock_sts_deprecated @mock_sts_deprecated
def test_get_federation_token(): def test_get_federation_token():
conn = boto.connect_sts() conn = boto.connect_sts()
token = conn.get_federation_token(duration=123, name="Bob") token_name = "Bob"
token = conn.get_federation_token(duration=123, name=token_name)
token.credentials.expiration.should.equal('2012-01-01T12:02:03.000Z') token.credentials.expiration.should.equal('2012-01-01T12:02:03.000Z')
token.credentials.session_token.should.equal( token.credentials.session_token.should.equal(
@ -35,8 +37,8 @@ def test_get_federation_token():
token.credentials.secret_key.should.equal( token.credentials.secret_key.should.equal(
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY") "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY")
token.federated_user_arn.should.equal( token.federated_user_arn.should.equal(
"arn:aws:sts::123456789012:federated-user/Bob") "arn:aws:sts::{account_id}:federated-user/{token_name}".format(account_id=ACCOUNT_ID, token_name=token_name))
token.federated_user_id.should.equal("123456789012:Bob") token.federated_user_id.should.equal(str(ACCOUNT_ID) + ":" + token_name)
@freeze_time("2012-01-01 12:00:00") @freeze_time("2012-01-01 12:00:00")
@ -72,17 +74,66 @@ def test_assume_role():
assert credentials['AccessKeyId'].startswith("ASIA") assert credentials['AccessKeyId'].startswith("ASIA")
credentials['SecretAccessKey'].should.have.length_of(40) credentials['SecretAccessKey'].should.have.length_of(40)
assume_role_response['AssumedRoleUser']['Arn'].should.equal("arn:aws:iam::123456789012:role/test-role") assume_role_response['AssumedRoleUser']['Arn'].should.equal("arn:aws:iam::123456789012:role/test-role/" + session_name)
assert assume_role_response['AssumedRoleUser']['AssumedRoleId'].startswith("AROA") assert assume_role_response['AssumedRoleUser']['AssumedRoleId'].startswith("AROA")
assert assume_role_response['AssumedRoleUser']['AssumedRoleId'].endswith(":" + session_name) assert assume_role_response['AssumedRoleUser']['AssumedRoleId'].endswith(":" + session_name)
assume_role_response['AssumedRoleUser']['AssumedRoleId'].should.have.length_of(21 + 1 + len(session_name)) assume_role_response['AssumedRoleUser']['AssumedRoleId'].should.have.length_of(21 + 1 + len(session_name))
@mock_sts @mock_sts
def test_get_caller_identity(): def test_get_caller_identity_with_default_credentials():
identity = boto3.client( identity = boto3.client(
"sts", region_name='us-east-1').get_caller_identity() "sts", region_name='us-east-1').get_caller_identity()
identity['Arn'].should.equal('arn:aws:sts::123456789012:user/moto') identity['Arn'].should.equal('arn:aws:sts::{account_id}:user/moto'.format(account_id=ACCOUNT_ID))
identity['UserId'].should.equal('AKIAIOSFODNN7EXAMPLE') identity['UserId'].should.equal('AKIAIOSFODNN7EXAMPLE')
identity['Account'].should.equal('123456789012') identity['Account'].should.equal(str(ACCOUNT_ID))
@mock_sts
@mock_iam
def test_get_caller_identity_with_iam_user_credentials():
iam_client = boto3.client("iam", region_name='us-east-1')
iam_user_name = "new-user"
iam_user = iam_client.create_user(UserName=iam_user_name)['User']
access_key = iam_client.create_access_key(UserName=iam_user_name)['AccessKey']
identity = boto3.client(
"sts", region_name='us-east-1', aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey']).get_caller_identity()
identity['Arn'].should.equal(iam_user['Arn'])
identity['UserId'].should.equal(iam_user['UserId'])
identity['Account'].should.equal(str(ACCOUNT_ID))
@mock_sts
@mock_iam
def test_get_caller_identity_with_assumed_role_credentials():
iam_client = boto3.client("iam", region_name='us-east-1')
sts_client = boto3.client("sts", region_name='us-east-1')
iam_role_name = "new-user"
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
"Action": "sts:AssumeRole"
}
}
iam_role_arn = iam_client.role_arn = iam_client.create_role(
RoleName=iam_role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)['Role']['Arn']
session_name = "new-session"
assumed_role = sts_client.assume_role(RoleArn=iam_role_arn,
RoleSessionName=session_name)
access_key = assumed_role['Credentials']
identity = boto3.client(
"sts", region_name='us-east-1', aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey']).get_caller_identity()
identity['Arn'].should.equal(assumed_role['AssumedRoleUser']['Arn'])
identity['UserId'].should.equal(assumed_role['AssumedRoleUser']['AssumedRoleId'])
identity['Account'].should.equal(str(ACCOUNT_ID))