import json import re from base64 import b64encode from datetime import datetime from unittest.mock import patch import boto3 import pytest from botocore.client import ClientError from freezegun import freeze_time from moto import mock_aws, settings from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.sts.responses import MAX_FEDERATION_TOKEN_POLICY_LENGTH @freeze_time("2012-01-01 12:00:00") @mock_aws def test_get_session_token_boto3(): client = boto3.client("sts", region_name="us-east-1") creds = client.get_session_token(DurationSeconds=903)["Credentials"] assert isinstance(creds["Expiration"], datetime) if not settings.TEST_SERVER_MODE: fdate = creds["Expiration"].strftime("%Y-%m-%dT%H:%M:%S.000Z") assert fdate == "2012-01-01T12:15:03.000Z" assert creds["SessionToken"] == ( "AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrR" "h3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb" "4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7" "b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE" ) assert creds["AccessKeyId"] == "AKIAIOSFODNN7EXAMPLE" assert creds["SecretAccessKey"] == "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY" @freeze_time("2012-01-01 12:00:00") @mock_aws def test_get_federation_token_boto3(): client = boto3.client("sts", region_name="us-east-1") token_name = "Bob" fed_token = client.get_federation_token(DurationSeconds=903, Name=token_name) creds = fed_token["Credentials"] fed_user = fed_token["FederatedUser"] assert isinstance(creds["Expiration"], datetime) if not settings.TEST_SERVER_MODE: fdate = creds["Expiration"].strftime("%Y-%m-%dT%H:%M:%S.000Z") assert fdate == "2012-01-01T12:15:03.000Z" assert creds["SessionToken"] == ( "AQoDYXdzEPT//////////wEXAMPLEtc764bNrC9SAPBSM22wDOk4x4HIZ8j4FZ" "TwdQWLWsKWHGBuFqwAeMicRXmxfpSPfIeoIYRqTflfKD8YUuwthAx7mSEI/qkP" "pKPi/kMcGdQrmGdeehM4IC1NtBmUpp2wUE8phUZampKsburEDy0KPkyQDYwT7W" "Z0wq5VSXDvp75YU9HFvlRd8Tx6q6fE8YQcHNVXAkiY9q6d+xo0rKwT38xVqr7Z" "D0u0iPPkUL64lIZbqBAz+scqKmlzm8FDrypNC9Yjc8fPOLn9FX9KSYvKTr4rvx" "3iSIlTJabIQwj2ICCR/oLxBA==" ) assert creds["AccessKeyId"] == "AKIAIOSFODNN7EXAMPLE" assert creds["SecretAccessKey"] == "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY" assert fed_user["Arn"] == (f"arn:aws:sts::{ACCOUNT_ID}:federated-user/{token_name}") assert fed_user["FederatedUserId"] == f"{ACCOUNT_ID}:{token_name}" @freeze_time("2012-01-01 12:00:00") @mock_aws @pytest.mark.parametrize( "region,partition", [("us-east-1", "aws"), ("cn-north-1", "aws-cn")] ) def test_assume_role(region, partition): client = boto3.client("sts", region_name=region) iam_client = boto3.client("iam", region_name=region) session_name = "session-name" policy = json.dumps( { "Statement": [ { "Sid": "Stmt13690092345534", "Action": ["S3:ListBucket"], "Effect": "Allow", "Resource": ["arn:aws:s3:::foobar-tester"], } ] } ) trust_policy_document = { "Version": "2012-10-17", "Statement": { "Effect": "Allow", "Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"}, "Action": "sts:AssumeRole", }, } role_name = "test-role" role = iam_client.create_role( RoleName="test-role", AssumeRolePolicyDocument=json.dumps(trust_policy_document) )["Role"] role_id = role["RoleId"] role_arn = role["Arn"] assume_role_response = client.assume_role( RoleArn=role_arn, RoleSessionName=session_name, Policy=policy, DurationSeconds=900, ) credentials = assume_role_response["Credentials"] if not settings.TEST_SERVER_MODE: assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00" assert len(credentials["SessionToken"]) == 356 assert credentials["SessionToken"].startswith("FQoGZXIvYXdzE") assert len(credentials["AccessKeyId"]) == 20 assert credentials["AccessKeyId"].startswith("ASIA") assert len(credentials["SecretAccessKey"]) == 40 assert assume_role_response["AssumedRoleUser"]["Arn"] == ( f"arn:{partition}:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name}" ) assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith("AROA") assert ( assume_role_response["AssumedRoleUser"]["AssumedRoleId"].rpartition(":")[0] == role_id ) assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith( ":" + session_name ) assert len(assume_role_response["AssumedRoleUser"]["AssumedRoleId"]) == ( 21 + 1 + len(session_name) ) @freeze_time("2012-01-01 12:00:00") @mock_aws def test_assume_role_with_saml(): client = boto3.client("sts", region_name="us-east-1") role_name = "test-role" provider_name = "TestProvFed" fed_identifier = "7ca82df9-1bad-4dd3-9b2b-adb68b554282" fed_name = "testuser" role_input = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" principal_role = f"arn:aws:iam:{ACCOUNT_ID}:saml-provider/{provider_name}" saml_assertion = f""" http://localhost/ http://localhost:3000/ NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= {fed_identifier} urn:amazon:webservices {fed_name} arn:aws:iam::{ACCOUNT_ID}:saml-provider/{provider_name},arn:aws:iam::{ACCOUNT_ID}:role/{role_name} 900 urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport """.replace("\n", "") assume_role_response = client.assume_role_with_saml( RoleArn=role_input, PrincipalArn=principal_role, SAMLAssertion=b64encode(saml_assertion.encode("utf-8")).decode("utf-8"), ) credentials = assume_role_response["Credentials"] if not settings.TEST_SERVER_MODE: assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00" assert len(credentials["SessionToken"]) == 356 assert credentials["SessionToken"].startswith("FQoGZXIvYXdzE") assert len(credentials["AccessKeyId"]) == 20 assert credentials["AccessKeyId"].startswith("ASIA") assert len(credentials["SecretAccessKey"]) == 40 assert assume_role_response["AssumedRoleUser"]["Arn"] == ( f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}" ) assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith("AROA") assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith( f":{fed_name}" ) assert len(assume_role_response["AssumedRoleUser"]["AssumedRoleId"]) == ( 21 + 1 + len(f"{fed_name}") ) @freeze_time("2012-01-01 12:00:00") @mock_aws def test_assume_role_with_saml_should_not_rely_on_attribute_order(): client = boto3.client("sts", region_name="us-east-1") role_name = "test-role" provider_name = "TestProvFed" fed_identifier = "7ca82df9-1bad-4dd3-9b2b-adb68b554282" fed_name = "testuser" role_input = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" principal_role = f"arn:aws:iam:{ACCOUNT_ID}:saml-provider/{provider_name}" saml_assertion = f""" http://localhost/ http://localhost:3000/ NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= {fed_identifier} urn:amazon:webservices arn:aws:iam::{ACCOUNT_ID}:saml-provider/{provider_name},arn:aws:iam::{ACCOUNT_ID}:role/{role_name} 900 {fed_name} urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport """.replace("\n", "") assume_role_response = client.assume_role_with_saml( RoleArn=role_input, PrincipalArn=principal_role, SAMLAssertion=b64encode(saml_assertion.encode("utf-8")).decode("utf-8"), ) credentials = assume_role_response["Credentials"] if not settings.TEST_SERVER_MODE: assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00" assert assume_role_response["AssumedRoleUser"]["Arn"] == ( f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}" ) @freeze_time("2012-01-01 12:00:00") @mock_aws def test_assume_role_with_saml_should_respect_xml_namespaces(): client = boto3.client("sts", region_name="us-east-1") role_name = "test-role" provider_name = "TestProvFed" fed_identifier = "7ca82df9-1bad-4dd3-9b2b-adb68b554282" fed_name = "testuser" role_input = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" principal_role = f"arn:aws:iam:{ACCOUNT_ID}:saml-provider/{provider_name}" saml_assertion = f""" http://localhost/ http://localhost:3000/ NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= {fed_identifier} urn:amazon:webservices {fed_name} arn:aws:iam::{ACCOUNT_ID}:saml-provider/{provider_name},arn:aws:iam::{ACCOUNT_ID}:role/{role_name} 900 urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport """.replace("\n", "") assume_role_response = client.assume_role_with_saml( RoleArn=role_input, PrincipalArn=principal_role, SAMLAssertion=b64encode(saml_assertion.encode("utf-8")).decode("utf-8"), ) credentials = assume_role_response["Credentials"] if not settings.TEST_SERVER_MODE: assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00" assert assume_role_response["AssumedRoleUser"]["Arn"] == ( f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}" ) @freeze_time("2012-01-01 12:00:00") @mock_aws def test_assume_role_with_saml_when_xml_tag_contains_xmlns_attributes(): """Test assume role with saml when xml tag contains xmlns attributes. Sssume role with saml should retrieve attribute value when xml tag contains xmlns attributes. """ client = boto3.client("sts", region_name="us-east-1") role_name = "test-role" provider_name = "TestProvFed" fed_identifier = "7ca82df9-1bad-4dd3-9b2b-adb68b554282" fed_name = "testuser" role_input = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" principal_role = f"arn:aws:iam:{ACCOUNT_ID}:saml-provider/{provider_name}" saml_assertion = f""" http://localhost/ http://localhost:3000/ NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= {fed_identifier} urn:amazon:webservices {fed_name} arn:aws:iam::{ACCOUNT_ID}:saml-provider/{provider_name},arn:aws:iam::{ACCOUNT_ID}:role/{role_name} 900 urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport """.replace("\n", "") assume_role_response = client.assume_role_with_saml( RoleArn=role_input, PrincipalArn=principal_role, SAMLAssertion=b64encode(saml_assertion.encode("utf-8")).decode("utf-8"), ) credentials = assume_role_response["Credentials"] if not settings.TEST_SERVER_MODE: assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00" assert assume_role_response["AssumedRoleUser"]["Arn"] == ( f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}" ) @freeze_time("2012-01-01 12:00:00") @mock_aws def test_assume_role_with_saml_when_saml_attribute_not_provided(): """Test session duration when saml attribute are not provided. Assume role should default the session duration to 3600 seconds when a saml attribute is not provided. """ client = boto3.client("sts", region_name="us-east-1") role_name = "test-role" provider_name = "TestProvFed" fed_identifier = "7ca82df9-1bad-4dd3-9b2b-adb68b554282" fed_name = "testuser" role_input = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" principal_role = f"arn:aws:iam:{ACCOUNT_ID}:saml-provider/{provider_name}" saml_assertion = f""" http://localhost/ http://localhost:3000/ NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= NTIyMzk0ZGI4MjI0ZjI5ZGNhYjkyOGQyZGQ1NTZjODViZjk5YTY4ODFjOWRjNjkyYzZmODY2ZDQ4NjlkZjY3YSAgLQo= {fed_identifier} urn:amazon:webservices arn:aws:iam::{ACCOUNT_ID}:saml-provider/{provider_name},arn:aws:iam::{ACCOUNT_ID}:role/{role_name} {fed_name} urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport """.replace("\n", "") assume_role_response = client.assume_role_with_saml( RoleArn=role_input, PrincipalArn=principal_role, SAMLAssertion=b64encode(saml_assertion.encode("utf-8")).decode("utf-8"), ) credentials = assume_role_response["Credentials"] assert "Expiration" in credentials if not settings.TEST_SERVER_MODE: assert credentials["Expiration"].isoformat() == "2012-01-01T13:00:00+00:00" @freeze_time("2012-01-01 12:00:00") @mock_aws @pytest.mark.parametrize( "region,partition", [("us-east-1", "aws"), ("cn-north-1", "aws-cn")] ) def test_assume_role_with_web_identity_boto3(region, partition): client = boto3.client("sts", region_name=region) policy = json.dumps( { "Statement": [ { "Sid": "Stmt13690092345534", "Action": ["S3:ListBucket"], "Effect": "Allow", "Resource": ["arn:aws:s3:::foobar-tester"], } ] } ) role_name = "test-role" s3_role = f"arn:aws:iam::{ACCOUNT_ID}:role/{role_name}" session_name = "session-name" role = client.assume_role_with_web_identity( RoleArn=s3_role, RoleSessionName=session_name, WebIdentityToken="????", Policy=policy, DurationSeconds=903, ) creds = role["Credentials"] user = role["AssumedRoleUser"] assert isinstance(creds["Expiration"], datetime) if not settings.TEST_SERVER_MODE: fdate = creds["Expiration"].strftime("%Y-%m-%dT%H:%M:%S.000Z") assert fdate == "2012-01-01T12:15:03.000Z" assert len(creds["SessionToken"]) == 356 assert re.match("^FQoGZXIvYXdzE", creds["SessionToken"]) assert len(creds["AccessKeyId"]) == 20 assert re.match("^ASIA", creds["AccessKeyId"]) assert len(creds["SecretAccessKey"]) == 40 assert user["Arn"] == ( f"arn:{partition}:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name}" ) assert "session-name" in user["AssumedRoleId"] @mock_aws @pytest.mark.parametrize( "region,partition", [("us-east-1", "aws"), ("cn-north-1", "aws-cn")] ) def test_get_caller_identity_with_default_credentials(region, partition): identity = boto3.client("sts", region_name=region).get_caller_identity() assert identity["Arn"] == f"arn:{partition}:sts::{ACCOUNT_ID}:user/moto" assert identity["UserId"] == "AKIAIOSFODNN7EXAMPLE" assert identity["Account"] == str(ACCOUNT_ID) @mock_aws @pytest.mark.parametrize( "region,partition", [("us-east-1", "aws"), ("cn-north-1", "aws-cn")] ) def test_get_caller_identity_with_iam_user_credentials(region, partition): iam_client = boto3.client("iam", region_name=region) iam_user_name = "new-user" iam_user = iam_client.create_user(UserName=iam_user_name)["User"] assert iam_user["Arn"] == f"arn:{partition}:iam::123456789012:user/new-user" access_key = iam_client.create_access_key(UserName=iam_user_name)["AccessKey"] identity = boto3.client( "sts", region_name=region, aws_access_key_id=access_key["AccessKeyId"], aws_secret_access_key=access_key["SecretAccessKey"], ).get_caller_identity() assert identity["Arn"] == iam_user["Arn"] assert identity["UserId"] == iam_user["UserId"] assert identity["Account"] == str(ACCOUNT_ID) @mock_aws @pytest.mark.parametrize( "region,partition", [("us-east-1", "aws"), ("cn-north-1", "aws-cn")] ) def test_get_caller_identity_with_assumed_role_credentials(region, partition): iam_client = boto3.client("iam", region_name=region) sts_client = boto3.client("sts", region_name=region) iam_role_name = "new-user" trust_policy_document = { "Version": "2012-10-17", "Statement": { "Effect": "Allow", "Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"}, "Action": "sts:AssumeRole", }, } iam_role_arn = iam_client.role_arn = iam_client.create_role( RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(trust_policy_document), )["Role"]["Arn"] session_name = "new-session" assumed_role = sts_client.assume_role( RoleArn=iam_role_arn, RoleSessionName=session_name ) assert ( assumed_role["AssumedRoleUser"]["Arn"] == f"arn:{partition}:sts::123456789012:assumed-role/new-user/new-session" ) access_key = assumed_role["Credentials"] identity = boto3.client( "sts", region_name="us-east-1", aws_access_key_id=access_key["AccessKeyId"], aws_secret_access_key=access_key["SecretAccessKey"], ).get_caller_identity() assert identity["Arn"] == assumed_role["AssumedRoleUser"]["Arn"] assert identity["UserId"] == assumed_role["AssumedRoleUser"]["AssumedRoleId"] assert identity["Account"] == str(ACCOUNT_ID) @mock_aws def test_federation_token_with_too_long_policy(): """Test federation token with policy longer than 2048 character fails.""" cli = boto3.client("sts", region_name="us-east-1") resource_tmpl = ( "arn:aws:s3:::yyyy-xxxxx-cloud-default/my_default_folder/folder-name-%s/*" ) statements = [] for num in range(30): statements.append( { "Effect": "Allow", "Action": ["s3:*"], "Resource": resource_tmpl % str(num), } ) policy = {"Version": "2012-10-17", "Statement": statements} json_policy = json.dumps(policy) assert len(json_policy) > MAX_FEDERATION_TOKEN_POLICY_LENGTH with pytest.raises(ClientError) as ex: cli.get_federation_token(Name="foo", DurationSeconds=3600, Policy=json_policy) assert ex.value.response["Error"]["Code"] == "ValidationError" assert ( str(MAX_FEDERATION_TOKEN_POLICY_LENGTH) in ex.value.response["Error"]["Message"] ) @patch.dict("os.environ", {"MOTO_ENABLE_ISO_REGIONS": "true"}) @pytest.mark.parametrize("region", ["us-west-2", "cn-northwest-1", "us-isob-east-1"]) @mock_aws def test_sts_regions(region): client = boto3.client("sts", region_name=region) resp = client.get_caller_identity() assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200