From de23d172ea53b367d9664852b7fd464f70973fcb Mon Sep 17 00:00:00 2001 From: Daniel Fangl Date: Fri, 30 Sep 2022 00:14:03 +0200 Subject: [PATCH] IAM: Make assumed role user ids dependent on role (#5501) --- moto/iam/models.py | 16 ++++++++++----- moto/iam/utils.py | 40 ++++++++++++++++++++++++++++++++++++++ moto/sts/models.py | 10 +++++++--- tests/test_iam/test_iam.py | 22 ++++++++++++--------- tests/test_sts/test_sts.py | 26 +++++++++++++++++++++---- 5 files changed, 93 insertions(+), 21 deletions(-) diff --git a/moto/iam/models.py b/moto/iam/models.py index bef9c3961..83562b7df 100644 --- a/moto/iam/models.py +++ b/moto/iam/models.py @@ -49,6 +49,8 @@ from .utils import ( random_alphanumeric, random_resource_id, random_policy_id, + random_role_id, + generate_access_key_id_from_account_id, ) from ..utilities.tagging_service import TaggingService @@ -983,9 +985,11 @@ class AccessKeyLastUsed: class AccessKey(CloudFormationModel): - def __init__(self, user_name, prefix, status="Active"): + def __init__(self, user_name, prefix, account_id, status="Active"): self.user_name = user_name - self.access_key_id = prefix + random_access_key() + self.access_key_id = generate_access_key_id_from_account_id( + account_id, prefix=prefix, total_length=20 + ) self.secret_access_key = random_alphanumeric(40) self.status = status self.create_date = datetime.utcnow() @@ -1202,7 +1206,9 @@ class User(CloudFormationModel): del self.policies[policy_name] def create_access_key(self, prefix, status="Active") -> AccessKey: - access_key = AccessKey(self.name, prefix=prefix, status=status) + access_key = AccessKey( + self.name, prefix=prefix, status=status, account_id=self.account_id + ) self.access_keys.append(access_key) return access_key @@ -1865,7 +1871,7 @@ class IAMBackend(BaseBackend): max_session_duration, linked_service=None, ): - role_id = random_resource_id() + role_id = random_role_id(self.account_id) if permissions_boundary and not self.policy_arn_regex.match( permissions_boundary ): @@ -2531,7 +2537,7 @@ class IAMBackend(BaseBackend): def create_temp_access_key(self): # Temporary access keys such as the ones returned by STS when assuming a role temporarily - key = AccessKey(user_name=None, prefix="ASIA") + key = AccessKey(user_name=None, prefix="ASIA", account_id=self.account_id) self.access_keys[key.physical_resource_id] = key return key diff --git a/moto/iam/utils.py b/moto/iam/utils.py index 76743751d..e467033e1 100644 --- a/moto/iam/utils.py +++ b/moto/iam/utils.py @@ -1,5 +1,39 @@ from moto.moto_api._internal import mock_random as random import string +import base64 + +AWS_ROLE_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" +ACCOUNT_OFFSET = 549755813888 # int.from_bytes(base64.b32decode(b"QAAAAAAA"), byteorder="big"), start value + + +def _random_uppercase_or_digit_sequence(length): + return "".join(str(random.choice(AWS_ROLE_ALPHABET)) for _ in range(length)) + + +def generate_access_key_id_from_account_id( + account_id: str, prefix: str, total_length: int = 20 +): + """ + Generates a key id (e.g. access key id) for the given account id and prefix + + :param account_id: Account id this key id should belong to + :param prefix: Prefix, e.g. ASIA for temp credentials or AROA for roles + :param total_length: Total length of the access key (e.g. 20 for temp access keys, 21 for role ids) + :return: Generated id + """ + account_id = int(account_id) + id_with_offset = account_id // 2 + ACCOUNT_OFFSET + account_bytes = int.to_bytes(id_with_offset, byteorder="big", length=5) + account_part = base64.b32encode(account_bytes).decode("utf-8") + middle_char = ( + random.choice(AWS_ROLE_ALPHABET[16:]) + if account_id % 2 + else random.choice(AWS_ROLE_ALPHABET[:16]) + ) + semi_fixed_part = prefix + account_part + middle_char + return semi_fixed_part + _random_uppercase_or_digit_sequence( + total_length - len(semi_fixed_part) + ) def random_alphanumeric(length): @@ -15,6 +49,12 @@ def random_resource_id(size=20): return "".join(str(random.choice(chars)) for x in range(size)) +def random_role_id(account_id: str) -> str: + return generate_access_key_id_from_account_id( + account_id=account_id, prefix="AROA", total_length=21 + ) + + def random_access_key(): return "".join( str(random.choice(string.ascii_uppercase + string.digits)) for _ in range(16) diff --git a/moto/sts/models.py b/moto/sts/models.py index 36409b475..fc492bc1c 100644 --- a/moto/sts/models.py +++ b/moto/sts/models.py @@ -7,8 +7,8 @@ from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict from moto.iam import iam_backends from moto.sts.utils import ( random_session_token, - random_assumed_role_id, DEFAULT_STS_SESSION_DURATION, + random_assumed_role_id, ) from typing import Mapping @@ -47,7 +47,6 @@ class AssumedRole(BaseModel): self.access_key_id = access_key.access_key_id self.secret_access_key = access_key.secret_access_key self.session_token = random_session_token() - self.assumed_role_id = "AROA" + random_assumed_role_id() @property def expiration_ISO8601(self): @@ -55,7 +54,12 @@ class AssumedRole(BaseModel): @property def user_id(self): - return self.assumed_role_id + ":" + self.session_name + iam_backend = iam_backends[self.account_id]["global"] + try: + role_id = iam_backend.get_role_by_arn(arn=self.role_arn).id + except Exception: + role_id = "AROA" + random_assumed_role_id() + return role_id + ":" + self.session_name @property def arn(self): diff --git a/tests/test_iam/test_iam.py b/tests/test_iam/test_iam.py index cb71832d4..54547cdd7 100644 --- a/tests/test_iam/test_iam.py +++ b/tests/test_iam/test_iam.py @@ -2525,7 +2525,7 @@ def test_create_role_defaults(): # Get role: role = conn.get_role(RoleName="my-role")["Role"] - + assert role["RoleId"].startswith("AROA") assert role["MaxSessionDuration"] == 3600 assert role.get("Description") is None @@ -3455,7 +3455,7 @@ def test_role_list_config_discovered_resources(): @mock_iam def test_role_config_dict(): from moto.iam.config import role_config_query, policy_config_query - from moto.iam.utils import random_resource_id, random_policy_id + from moto.iam.utils import random_role_id, random_policy_id # Without any roles assert not role_config_query.get_config_resource(DEFAULT_ACCOUNT_ID, "something") @@ -3511,7 +3511,7 @@ def test_role_config_dict(): DEFAULT_ACCOUNT_ID, None, None, 100, None )[0][0] assert plain_role is not None - assert len(plain_role["id"]) == len(random_resource_id()) + assert len(plain_role["id"]) == len(random_role_id(DEFAULT_ACCOUNT_ID)) role_config_query.backends[DEFAULT_ACCOUNT_ID]["global"].create_role( role_name="assume_role", @@ -3531,7 +3531,7 @@ def test_role_config_dict(): if role["id"] not in [plain_role["id"]] ) assert assume_role is not None - assert len(assume_role["id"]) == len(random_resource_id()) + assert len(assume_role["id"]) == len(random_role_id(DEFAULT_ACCOUNT_ID)) assert assume_role["id"] is not plain_role["id"] role_config_query.backends[DEFAULT_ACCOUNT_ID]["global"].create_role( @@ -3552,7 +3552,9 @@ def test_role_config_dict(): if role["id"] not in [plain_role["id"], assume_role["id"]] ) assert assume_and_permission_boundary_role is not None - assert len(assume_and_permission_boundary_role["id"]) == len(random_resource_id()) + assert len(assume_and_permission_boundary_role["id"]) == len( + random_role_id(DEFAULT_ACCOUNT_ID) + ) assert assume_and_permission_boundary_role["id"] is not plain_role["id"] assert assume_and_permission_boundary_role["id"] is not assume_role["id"] @@ -3581,7 +3583,9 @@ def test_role_config_dict(): ] ) assert role_with_attached_policy is not None - assert len(role_with_attached_policy["id"]) == len(random_resource_id()) + assert len(role_with_attached_policy["id"]) == len( + random_role_id(DEFAULT_ACCOUNT_ID) + ) assert role_with_attached_policy["id"] is not plain_role["id"] assert role_with_attached_policy["id"] is not assume_role["id"] assert ( @@ -3615,7 +3619,7 @@ def test_role_config_dict(): ] ) assert role_with_inline_policy is not None - assert len(role_with_inline_policy["id"]) == len(random_resource_id()) + assert len(role_with_inline_policy["id"]) == len(random_role_id(DEFAULT_ACCOUNT_ID)) assert role_with_inline_policy["id"] is not plain_role["id"] assert role_with_inline_policy["id"] is not assume_role["id"] assert ( @@ -3734,7 +3738,7 @@ def test_role_config_client(): raise SkipTest( "Cannot test this in Py3.6; outdated botocore dependencies do not have all regions" ) - from moto.iam.utils import random_resource_id + from moto.iam.utils import random_role_id CONFIG_REGIONS = boto3.Session().get_available_regions("config") @@ -3789,7 +3793,7 @@ def test_role_config_client(): ) first_result = result["resourceIdentifiers"][0]["resourceId"] assert result["resourceIdentifiers"][0]["resourceType"] == "AWS::IAM::Role" - assert len(first_result) == len(random_resource_id()) + assert len(first_result) == len(random_role_id(DEFAULT_ACCOUNT_ID)) # Test non-aggregated pagination assert ( diff --git a/tests/test_sts/test_sts.py b/tests/test_sts/test_sts.py index 025fa0be6..3d122f02e 100644 --- a/tests/test_sts/test_sts.py +++ b/tests/test_sts/test_sts.py @@ -61,8 +61,10 @@ def test_get_federation_token_boto3(): @freeze_time("2012-01-01 12:00:00") @mock_sts +@mock_iam def test_assume_role(): client = boto3.client("sts", region_name="us-east-1") + iam_client = boto3.client("iam", region_name="us-east-1") session_name = "session-name" policy = json.dumps( @@ -77,12 +79,24 @@ def test_assume_role(): ] } ) + trust_policy_document = { + "Version": "2012-10-17", + "Statement": { + "Effect": "Allow", + "Principal": { + "AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID) + }, + "Action": "sts:AssumeRole", + }, + } role_name = "test-role" - s3_role = "arn:aws:iam::{account_id}:role/{role_name}".format( - account_id=ACCOUNT_ID, role_name=role_name - ) + role = iam_client.create_role( + RoleName="test-role", AssumeRolePolicyDocument=json.dumps(trust_policy_document) + )["Role"] + role_id = role["RoleId"] + role_arn = role["Arn"] assume_role_response = client.assume_role( - RoleArn=s3_role, + RoleArn=role_arn, RoleSessionName=session_name, Policy=policy, DurationSeconds=900, @@ -103,6 +117,10 @@ def test_assume_role(): ) ) assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith("AROA") + assert ( + assume_role_response["AssumedRoleUser"]["AssumedRoleId"].rpartition(":")[0] + == role_id + ) assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith( ":" + session_name )