IAM: Make assumed role user ids dependent on role (#5501)

This commit is contained in:
Daniel Fangl 2022-09-30 00:14:03 +02:00 committed by GitHub
parent f082562b0f
commit de23d172ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 21 deletions

View File

@ -49,6 +49,8 @@ from .utils import (
random_alphanumeric,
random_resource_id,
random_policy_id,
random_role_id,
generate_access_key_id_from_account_id,
)
from ..utilities.tagging_service import TaggingService
@ -983,9 +985,11 @@ class AccessKeyLastUsed:
class AccessKey(CloudFormationModel):
def __init__(self, user_name, prefix, status="Active"):
def __init__(self, user_name, prefix, account_id, status="Active"):
self.user_name = user_name
self.access_key_id = prefix + random_access_key()
self.access_key_id = generate_access_key_id_from_account_id(
account_id, prefix=prefix, total_length=20
)
self.secret_access_key = random_alphanumeric(40)
self.status = status
self.create_date = datetime.utcnow()
@ -1202,7 +1206,9 @@ class User(CloudFormationModel):
del self.policies[policy_name]
def create_access_key(self, prefix, status="Active") -> AccessKey:
access_key = AccessKey(self.name, prefix=prefix, status=status)
access_key = AccessKey(
self.name, prefix=prefix, status=status, account_id=self.account_id
)
self.access_keys.append(access_key)
return access_key
@ -1865,7 +1871,7 @@ class IAMBackend(BaseBackend):
max_session_duration,
linked_service=None,
):
role_id = random_resource_id()
role_id = random_role_id(self.account_id)
if permissions_boundary and not self.policy_arn_regex.match(
permissions_boundary
):
@ -2531,7 +2537,7 @@ class IAMBackend(BaseBackend):
def create_temp_access_key(self):
# Temporary access keys such as the ones returned by STS when assuming a role temporarily
key = AccessKey(user_name=None, prefix="ASIA")
key = AccessKey(user_name=None, prefix="ASIA", account_id=self.account_id)
self.access_keys[key.physical_resource_id] = key
return key

View File

@ -1,5 +1,39 @@
from moto.moto_api._internal import mock_random as random
import string
import base64
AWS_ROLE_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
ACCOUNT_OFFSET = 549755813888 # int.from_bytes(base64.b32decode(b"QAAAAAAA"), byteorder="big"), start value
def _random_uppercase_or_digit_sequence(length):
return "".join(str(random.choice(AWS_ROLE_ALPHABET)) for _ in range(length))
def generate_access_key_id_from_account_id(
account_id: str, prefix: str, total_length: int = 20
):
"""
Generates a key id (e.g. access key id) for the given account id and prefix
:param account_id: Account id this key id should belong to
:param prefix: Prefix, e.g. ASIA for temp credentials or AROA for roles
:param total_length: Total length of the access key (e.g. 20 for temp access keys, 21 for role ids)
:return: Generated id
"""
account_id = int(account_id)
id_with_offset = account_id // 2 + ACCOUNT_OFFSET
account_bytes = int.to_bytes(id_with_offset, byteorder="big", length=5)
account_part = base64.b32encode(account_bytes).decode("utf-8")
middle_char = (
random.choice(AWS_ROLE_ALPHABET[16:])
if account_id % 2
else random.choice(AWS_ROLE_ALPHABET[:16])
)
semi_fixed_part = prefix + account_part + middle_char
return semi_fixed_part + _random_uppercase_or_digit_sequence(
total_length - len(semi_fixed_part)
)
def random_alphanumeric(length):
@ -15,6 +49,12 @@ def random_resource_id(size=20):
return "".join(str(random.choice(chars)) for x in range(size))
def random_role_id(account_id: str) -> str:
return generate_access_key_id_from_account_id(
account_id=account_id, prefix="AROA", total_length=21
)
def random_access_key():
return "".join(
str(random.choice(string.ascii_uppercase + string.digits)) for _ in range(16)

View File

@ -7,8 +7,8 @@ from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict
from moto.iam import iam_backends
from moto.sts.utils import (
random_session_token,
random_assumed_role_id,
DEFAULT_STS_SESSION_DURATION,
random_assumed_role_id,
)
from typing import Mapping
@ -47,7 +47,6 @@ class AssumedRole(BaseModel):
self.access_key_id = access_key.access_key_id
self.secret_access_key = access_key.secret_access_key
self.session_token = random_session_token()
self.assumed_role_id = "AROA" + random_assumed_role_id()
@property
def expiration_ISO8601(self):
@ -55,7 +54,12 @@ class AssumedRole(BaseModel):
@property
def user_id(self):
return self.assumed_role_id + ":" + self.session_name
iam_backend = iam_backends[self.account_id]["global"]
try:
role_id = iam_backend.get_role_by_arn(arn=self.role_arn).id
except Exception:
role_id = "AROA" + random_assumed_role_id()
return role_id + ":" + self.session_name
@property
def arn(self):

View File

@ -2525,7 +2525,7 @@ def test_create_role_defaults():
# Get role:
role = conn.get_role(RoleName="my-role")["Role"]
assert role["RoleId"].startswith("AROA")
assert role["MaxSessionDuration"] == 3600
assert role.get("Description") is None
@ -3455,7 +3455,7 @@ def test_role_list_config_discovered_resources():
@mock_iam
def test_role_config_dict():
from moto.iam.config import role_config_query, policy_config_query
from moto.iam.utils import random_resource_id, random_policy_id
from moto.iam.utils import random_role_id, random_policy_id
# Without any roles
assert not role_config_query.get_config_resource(DEFAULT_ACCOUNT_ID, "something")
@ -3511,7 +3511,7 @@ def test_role_config_dict():
DEFAULT_ACCOUNT_ID, None, None, 100, None
)[0][0]
assert plain_role is not None
assert len(plain_role["id"]) == len(random_resource_id())
assert len(plain_role["id"]) == len(random_role_id(DEFAULT_ACCOUNT_ID))
role_config_query.backends[DEFAULT_ACCOUNT_ID]["global"].create_role(
role_name="assume_role",
@ -3531,7 +3531,7 @@ def test_role_config_dict():
if role["id"] not in [plain_role["id"]]
)
assert assume_role is not None
assert len(assume_role["id"]) == len(random_resource_id())
assert len(assume_role["id"]) == len(random_role_id(DEFAULT_ACCOUNT_ID))
assert assume_role["id"] is not plain_role["id"]
role_config_query.backends[DEFAULT_ACCOUNT_ID]["global"].create_role(
@ -3552,7 +3552,9 @@ def test_role_config_dict():
if role["id"] not in [plain_role["id"], assume_role["id"]]
)
assert assume_and_permission_boundary_role is not None
assert len(assume_and_permission_boundary_role["id"]) == len(random_resource_id())
assert len(assume_and_permission_boundary_role["id"]) == len(
random_role_id(DEFAULT_ACCOUNT_ID)
)
assert assume_and_permission_boundary_role["id"] is not plain_role["id"]
assert assume_and_permission_boundary_role["id"] is not assume_role["id"]
@ -3581,7 +3583,9 @@ def test_role_config_dict():
]
)
assert role_with_attached_policy is not None
assert len(role_with_attached_policy["id"]) == len(random_resource_id())
assert len(role_with_attached_policy["id"]) == len(
random_role_id(DEFAULT_ACCOUNT_ID)
)
assert role_with_attached_policy["id"] is not plain_role["id"]
assert role_with_attached_policy["id"] is not assume_role["id"]
assert (
@ -3615,7 +3619,7 @@ def test_role_config_dict():
]
)
assert role_with_inline_policy is not None
assert len(role_with_inline_policy["id"]) == len(random_resource_id())
assert len(role_with_inline_policy["id"]) == len(random_role_id(DEFAULT_ACCOUNT_ID))
assert role_with_inline_policy["id"] is not plain_role["id"]
assert role_with_inline_policy["id"] is not assume_role["id"]
assert (
@ -3734,7 +3738,7 @@ def test_role_config_client():
raise SkipTest(
"Cannot test this in Py3.6; outdated botocore dependencies do not have all regions"
)
from moto.iam.utils import random_resource_id
from moto.iam.utils import random_role_id
CONFIG_REGIONS = boto3.Session().get_available_regions("config")
@ -3789,7 +3793,7 @@ def test_role_config_client():
)
first_result = result["resourceIdentifiers"][0]["resourceId"]
assert result["resourceIdentifiers"][0]["resourceType"] == "AWS::IAM::Role"
assert len(first_result) == len(random_resource_id())
assert len(first_result) == len(random_role_id(DEFAULT_ACCOUNT_ID))
# Test non-aggregated pagination
assert (

View File

@ -61,8 +61,10 @@ def test_get_federation_token_boto3():
@freeze_time("2012-01-01 12:00:00")
@mock_sts
@mock_iam
def test_assume_role():
client = boto3.client("sts", region_name="us-east-1")
iam_client = boto3.client("iam", region_name="us-east-1")
session_name = "session-name"
policy = json.dumps(
@ -77,12 +79,24 @@ def test_assume_role():
]
}
)
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)
},
"Action": "sts:AssumeRole",
},
}
role_name = "test-role"
s3_role = "arn:aws:iam::{account_id}:role/{role_name}".format(
account_id=ACCOUNT_ID, role_name=role_name
)
role = iam_client.create_role(
RoleName="test-role", AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)["Role"]
role_id = role["RoleId"]
role_arn = role["Arn"]
assume_role_response = client.assume_role(
RoleArn=s3_role,
RoleArn=role_arn,
RoleSessionName=session_name,
Policy=policy,
DurationSeconds=900,
@ -103,6 +117,10 @@ def test_assume_role():
)
)
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith("AROA")
assert (
assume_role_response["AssumedRoleUser"]["AssumedRoleId"].rpartition(":")[0]
== role_id
)
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith(
":" + session_name
)