Techdebt: Replace sure with regular assertions in STS (#6632)

This commit is contained in:
kbalk 2023-08-10 18:03:47 -04:00 committed by GitHub
parent deb914fc54
commit deba2e38cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 90 deletions

View File

@ -1,20 +1,15 @@
import sure # noqa # pylint: disable=unused-import
"""Test the different server responses."""
import moto.server as server
"""
Test the different server responses
"""
def test_sts_get_session_token():
backend = server.create_backend_app("sts")
test_client = backend.test_client()
res = test_client.get("/?Action=GetSessionToken")
res.status_code.should.equal(200)
res.data.should.contain(b"SessionToken")
res.data.should.contain(b"AccessKeyId")
assert res.status_code == 200
assert b"SessionToken" in res.data
assert b"AccessKeyId" in res.data
def test_sts_get_federation_token():
@ -22,9 +17,9 @@ def test_sts_get_federation_token():
test_client = backend.test_client()
res = test_client.get("/?Action=GetFederationToken&Name=Bob")
res.status_code.should.equal(200)
res.data.should.contain(b"SessionToken")
res.data.should.contain(b"AccessKeyId")
assert res.status_code == 200
assert b"SessionToken" in res.data
assert b"AccessKeyId" in res.data
def test_sts_get_caller_identity():
@ -32,10 +27,10 @@ def test_sts_get_caller_identity():
test_client = backend.test_client()
res = test_client.get("/?Action=GetCallerIdentity")
res.status_code.should.equal(200)
res.data.should.contain(b"Arn")
res.data.should.contain(b"UserId")
res.data.should.contain(b"Account")
assert res.status_code == 200
assert b"Arn" in res.data
assert b"UserId" in res.data
assert b"Account" in res.data
def test_sts_wellformed_xml():
@ -43,8 +38,8 @@ def test_sts_wellformed_xml():
test_client = backend.test_client()
res = test_client.get("/?Action=GetFederationToken&Name=Bob")
res.data.should_not.contain(b"\n")
assert b"\n" not in res.data
res = test_client.get("/?Action=GetSessionToken")
res.data.should_not.contain(b"\n")
assert b"\n" not in res.data
res = test_client.get("/?Action=GetCallerIdentity")
res.data.should_not.contain(b"\n")
assert b"\n" not in res.data

View File

@ -1,13 +1,13 @@
from base64 import b64encode
from datetime import datetime
import json
import re
from unittest.mock import patch
import boto3
from botocore.client import ClientError
from datetime import datetime
from freezegun import freeze_time
from unittest.mock import patch
import pytest
import sure # noqa # pylint: disable=unused-import
from moto import mock_sts, mock_iam, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
@ -20,16 +20,19 @@ def test_get_session_token_boto3():
client = boto3.client("sts", region_name="us-east-1")
creds = client.get_session_token(DurationSeconds=903)["Credentials"]
creds["Expiration"].should.be.a(datetime)
assert isinstance(creds["Expiration"], datetime)
if not settings.TEST_SERVER_MODE:
fdate = creds["Expiration"].strftime("%Y-%m-%dT%H:%M:%S.000Z")
fdate.should.equal("2012-01-01T12:15:03.000Z")
creds["SessionToken"].should.equal(
"AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE"
assert fdate == "2012-01-01T12:15:03.000Z"
assert creds["SessionToken"] == (
"AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrR"
"h3c/LTo6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb"
"4IgRmpRV3zrkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7"
"b15fjrBs2+cTQtpZ3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE"
)
creds["AccessKeyId"].should.equal("AKIAIOSFODNN7EXAMPLE")
creds["SecretAccessKey"].should.equal("wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY")
assert creds["AccessKeyId"] == "AKIAIOSFODNN7EXAMPLE"
assert creds["SecretAccessKey"] == "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY"
@freeze_time("2012-01-01 12:00:00")
@ -41,21 +44,24 @@ def test_get_federation_token_boto3():
creds = fed_token["Credentials"]
fed_user = fed_token["FederatedUser"]
creds["Expiration"].should.be.a(datetime)
assert isinstance(creds["Expiration"], datetime)
if not settings.TEST_SERVER_MODE:
fdate = creds["Expiration"].strftime("%Y-%m-%dT%H:%M:%S.000Z")
fdate.should.equal("2012-01-01T12:15:03.000Z")
creds["SessionToken"].should.equal(
"AQoDYXdzEPT//////////wEXAMPLEtc764bNrC9SAPBSM22wDOk4x4HIZ8j4FZTwdQWLWsKWHGBuFqwAeMicRXmxfpSPfIeoIYRqTflfKD8YUuwthAx7mSEI/qkPpKPi/kMcGdQrmGdeehM4IC1NtBmUpp2wUE8phUZampKsburEDy0KPkyQDYwT7WZ0wq5VSXDvp75YU9HFvlRd8Tx6q6fE8YQcHNVXAkiY9q6d+xo0rKwT38xVqr7ZD0u0iPPkUL64lIZbqBAz+scqKmlzm8FDrypNC9Yjc8fPOLn9FX9KSYvKTr4rvx3iSIlTJabIQwj2ICCR/oLxBA=="
assert fdate == "2012-01-01T12:15:03.000Z"
assert creds["SessionToken"] == (
"AQoDYXdzEPT//////////wEXAMPLEtc764bNrC9SAPBSM22wDOk4x4HIZ8j4FZ"
"TwdQWLWsKWHGBuFqwAeMicRXmxfpSPfIeoIYRqTflfKD8YUuwthAx7mSEI/qkP"
"pKPi/kMcGdQrmGdeehM4IC1NtBmUpp2wUE8phUZampKsburEDy0KPkyQDYwT7W"
"Z0wq5VSXDvp75YU9HFvlRd8Tx6q6fE8YQcHNVXAkiY9q6d+xo0rKwT38xVqr7Z"
"D0u0iPPkUL64lIZbqBAz+scqKmlzm8FDrypNC9Yjc8fPOLn9FX9KSYvKTr4rvx"
"3iSIlTJabIQwj2ICCR/oLxBA=="
)
creds["AccessKeyId"].should.equal("AKIAIOSFODNN7EXAMPLE")
creds["SecretAccessKey"].should.equal("wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY")
assert creds["AccessKeyId"] == "AKIAIOSFODNN7EXAMPLE"
assert creds["SecretAccessKey"] == "wJalrXUtnFEMI/K7MDENG/bPxRfiCYzEXAMPLEKEY"
fed_user["Arn"].should.equal(
f"arn:aws:sts::{ACCOUNT_ID}:federated-user/{token_name}"
)
fed_user["FederatedUserId"].should.equal(f"{ACCOUNT_ID}:{token_name}")
assert fed_user["Arn"] == (f"arn:aws:sts::{ACCOUNT_ID}:federated-user/{token_name}")
assert fed_user["FederatedUserId"] == f"{ACCOUNT_ID}:{token_name}"
@freeze_time("2012-01-01 12:00:00")
@ -101,14 +107,14 @@ def test_assume_role():
credentials = assume_role_response["Credentials"]
if not settings.TEST_SERVER_MODE:
credentials["Expiration"].isoformat().should.equal("2012-01-01T12:15:00+00:00")
credentials["SessionToken"].should.have.length_of(356)
assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00"
assert len(credentials["SessionToken"]) == 356
assert credentials["SessionToken"].startswith("FQoGZXIvYXdzE")
credentials["AccessKeyId"].should.have.length_of(20)
assert len(credentials["AccessKeyId"]) == 20
assert credentials["AccessKeyId"].startswith("ASIA")
credentials["SecretAccessKey"].should.have.length_of(40)
assert len(credentials["SecretAccessKey"]) == 40
assume_role_response["AssumedRoleUser"]["Arn"].should.equal(
assert assume_role_response["AssumedRoleUser"]["Arn"] == (
f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name}"
)
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith("AROA")
@ -119,7 +125,7 @@ def test_assume_role():
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith(
":" + session_name
)
assume_role_response["AssumedRoleUser"]["AssumedRoleId"].should.have.length_of(
assert len(assume_role_response["AssumedRoleUser"]["AssumedRoleId"]) == (
21 + 1 + len(session_name)
)
@ -203,21 +209,21 @@ def test_assume_role_with_saml():
credentials = assume_role_response["Credentials"]
if not settings.TEST_SERVER_MODE:
credentials["Expiration"].isoformat().should.equal("2012-01-01T12:15:00+00:00")
credentials["SessionToken"].should.have.length_of(356)
assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00"
assert len(credentials["SessionToken"]) == 356
assert credentials["SessionToken"].startswith("FQoGZXIvYXdzE")
credentials["AccessKeyId"].should.have.length_of(20)
assert len(credentials["AccessKeyId"]) == 20
assert credentials["AccessKeyId"].startswith("ASIA")
credentials["SecretAccessKey"].should.have.length_of(40)
assert len(credentials["SecretAccessKey"]) == 40
assume_role_response["AssumedRoleUser"]["Arn"].should.equal(
assert assume_role_response["AssumedRoleUser"]["Arn"] == (
f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}"
)
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].startswith("AROA")
assert assume_role_response["AssumedRoleUser"]["AssumedRoleId"].endswith(
f":{fed_name}"
)
assume_role_response["AssumedRoleUser"]["AssumedRoleId"].should.have.length_of(
assert len(assume_role_response["AssumedRoleUser"]["AssumedRoleId"]) == (
21 + 1 + len(f"{fed_name}")
)
@ -301,9 +307,9 @@ def test_assume_role_with_saml_should_not_rely_on_attribute_order():
credentials = assume_role_response["Credentials"]
if not settings.TEST_SERVER_MODE:
credentials["Expiration"].isoformat().should.equal("2012-01-01T12:15:00+00:00")
assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00"
assume_role_response["AssumedRoleUser"]["Arn"].should.equal(
assert assume_role_response["AssumedRoleUser"]["Arn"] == (
f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}"
)
@ -387,16 +393,21 @@ def test_assume_role_with_saml_should_respect_xml_namespaces():
credentials = assume_role_response["Credentials"]
if not settings.TEST_SERVER_MODE:
credentials["Expiration"].isoformat().should.equal("2012-01-01T12:15:00+00:00")
assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00"
assume_role_response["AssumedRoleUser"]["Arn"].should.equal(
assert assume_role_response["AssumedRoleUser"]["Arn"] == (
f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}"
)
@freeze_time("2012-01-01 12:00:00")
@mock_sts
def test_assume_role_with_saml_should_retrieve_attribute_value_from_text_when_xml_tag_contains_xmlns_attributes():
def test_assume_role_with_saml_when_xml_tag_contains_xmlns_attributes():
"""Test assume role with saml when xml tag contains xmlns attributes.
Sssume role with saml should retrieve attribute value when xml tag
contains xmlns attributes.
"""
client = boto3.client("sts", region_name="us-east-1")
role_name = "test-role"
provider_name = "TestProvFed"
@ -479,16 +490,21 @@ def test_assume_role_with_saml_should_retrieve_attribute_value_from_text_when_xm
credentials = assume_role_response["Credentials"]
if not settings.TEST_SERVER_MODE:
credentials["Expiration"].isoformat().should.equal("2012-01-01T12:15:00+00:00")
assert credentials["Expiration"].isoformat() == "2012-01-01T12:15:00+00:00"
assume_role_response["AssumedRoleUser"]["Arn"].should.equal(
assert assume_role_response["AssumedRoleUser"]["Arn"] == (
f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{fed_name}"
)
@freeze_time("2012-01-01 12:00:00")
@mock_sts
def test_assume_role_with_saml_should_default_session_duration_to_3600_seconds_when_saml_attribute_not_provided():
def test_assume_role_with_saml_when_saml_attribute_not_provided():
"""Test session duration when saml attribute are not provided.
Assume role should default the session duration to 3600 seconds when
a saml attribute is not provided.
"""
client = boto3.client("sts", region_name="us-east-1")
role_name = "test-role"
provider_name = "TestProvFed"
@ -561,9 +577,9 @@ def test_assume_role_with_saml_should_default_session_duration_to_3600_seconds_w
)
credentials = assume_role_response["Credentials"]
credentials.should.have.key("Expiration")
assert "Expiration" in credentials
if not settings.TEST_SERVER_MODE:
credentials["Expiration"].isoformat().should.equal("2012-01-01T13:00:00+00:00")
assert credentials["Expiration"].isoformat() == "2012-01-01T13:00:00+00:00"
@freeze_time("2012-01-01 12:00:00")
@ -597,31 +613,31 @@ def test_assume_role_with_web_identity_boto3():
creds = role["Credentials"]
user = role["AssumedRoleUser"]
creds["Expiration"].should.be.a(datetime)
assert isinstance(creds["Expiration"], datetime)
if not settings.TEST_SERVER_MODE:
fdate = creds["Expiration"].strftime("%Y-%m-%dT%H:%M:%S.000Z")
fdate.should.equal("2012-01-01T12:15:03.000Z")
assert fdate == "2012-01-01T12:15:03.000Z"
creds["SessionToken"].should.have.length_of(356)
creds["SessionToken"].should.match("^FQoGZXIvYXdzE")
creds["AccessKeyId"].should.have.length_of(20)
creds["AccessKeyId"].should.match("^ASIA")
creds["SecretAccessKey"].should.have.length_of(40)
assert len(creds["SessionToken"]) == 356
assert re.match("^FQoGZXIvYXdzE", creds["SessionToken"])
assert len(creds["AccessKeyId"]) == 20
assert re.match("^ASIA", creds["AccessKeyId"])
assert len(creds["SecretAccessKey"]) == 40
user["Arn"].should.equal(
assert user["Arn"] == (
f"arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name}"
)
user["AssumedRoleId"].should.contain("session-name")
assert "session-name" in user["AssumedRoleId"]
@mock_sts
def test_get_caller_identity_with_default_credentials():
identity = boto3.client("sts", region_name="us-east-1").get_caller_identity()
identity["Arn"].should.equal(f"arn:aws:sts::{ACCOUNT_ID}:user/moto")
identity["UserId"].should.equal("AKIAIOSFODNN7EXAMPLE")
identity["Account"].should.equal(str(ACCOUNT_ID))
assert identity["Arn"] == f"arn:aws:sts::{ACCOUNT_ID}:user/moto"
assert identity["UserId"] == "AKIAIOSFODNN7EXAMPLE"
assert identity["Account"] == str(ACCOUNT_ID)
@mock_sts
@ -639,9 +655,9 @@ def test_get_caller_identity_with_iam_user_credentials():
aws_secret_access_key=access_key["SecretAccessKey"],
).get_caller_identity()
identity["Arn"].should.equal(iam_user["Arn"])
identity["UserId"].should.equal(iam_user["UserId"])
identity["Account"].should.equal(str(ACCOUNT_ID))
assert identity["Arn"] == iam_user["Arn"]
assert identity["UserId"] == iam_user["UserId"]
assert identity["Account"] == str(ACCOUNT_ID)
@mock_sts
@ -675,14 +691,14 @@ def test_get_caller_identity_with_assumed_role_credentials():
aws_secret_access_key=access_key["SecretAccessKey"],
).get_caller_identity()
identity["Arn"].should.equal(assumed_role["AssumedRoleUser"]["Arn"])
identity["UserId"].should.equal(assumed_role["AssumedRoleUser"]["AssumedRoleId"])
identity["Account"].should.equal(str(ACCOUNT_ID))
assert identity["Arn"] == assumed_role["AssumedRoleUser"]["Arn"]
assert identity["UserId"] == assumed_role["AssumedRoleUser"]["AssumedRoleId"]
assert identity["Account"] == str(ACCOUNT_ID)
@mock_sts
def test_federation_token_with_too_long_policy():
"Trying to get a federation token with a policy longer than 2048 character should fail"
"""Test federation token with policy longer than 2048 character fails."""
cli = boto3.client("sts", region_name="us-east-1")
resource_tmpl = (
"arn:aws:s3:::yyyy-xxxxx-cloud-default/my_default_folder/folder-name-%s/*"
@ -702,9 +718,9 @@ def test_federation_token_with_too_long_policy():
with pytest.raises(ClientError) as ex:
cli.get_federation_token(Name="foo", DurationSeconds=3600, Policy=json_policy)
ex.value.response["Error"]["Code"].should.equal("ValidationError")
ex.value.response["Error"]["Message"].should.contain(
str(MAX_FEDERATION_TOKEN_POLICY_LENGTH)
assert ex.value.response["Error"]["Code"] == "ValidationError"
assert (
str(MAX_FEDERATION_TOKEN_POLICY_LENGTH) in ex.value.response["Error"]["Message"]
)
@ -714,4 +730,4 @@ def test_federation_token_with_too_long_policy():
def test_sts_regions(region):
client = boto3.client("sts", region_name=region)
resp = client.get_caller_identity()
resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200

View File

@ -1,7 +1,7 @@
import boto3
from base64 import b64encode
import unittest
from base64 import b64encode
import boto3
from moto import mock_dynamodb, mock_sts, mock_iam
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
@ -32,7 +32,7 @@ class TestStsAssumeRole(unittest.TestCase):
region_name="us-east-1",
)
assumed_arn = sts_account_b.get_caller_identity()["Arn"]
assumed_arn.should.equal(
assert assumed_arn == (
f"arn:aws:sts::{self.account_b}:assumed-role/my-role/test-session-name"
)
iam_account_b = boto3.client(
@ -45,7 +45,7 @@ class TestStsAssumeRole(unittest.TestCase):
# Verify new users belong to the different account
user = iam_account_b.create_user(UserName="user-in-new-account")["User"]
user["Arn"].should.equal(
assert user["Arn"] == (
f"arn:aws:iam::{self.account_b}:user/user-in-new-account"
)
@ -135,7 +135,7 @@ class TestStsAssumeRole(unittest.TestCase):
# Verify new users belong to the different account
user = iam_account_b.create_user(UserName="user-in-new-account")["User"]
user["Arn"].should.equal(
assert user["Arn"] == (
f"arn:aws:iam::{self.account_b}:user/user-in-new-account"
)
@ -174,11 +174,11 @@ class TestStsAssumeRole(unittest.TestCase):
)
table = ddb_client.describe_table(TableName="table-in-default-account")["Table"]
table["TableArn"].should.equal(
assert table["TableArn"] == (
"arn:aws:dynamodb:us-east-1:123456789012:table/table-in-default-account"
)
table = ddb_account_b.describe_table(TableName="table-in-new-account")["Table"]
table["TableArn"].should.equal(
assert table["TableArn"] == (
f"arn:aws:dynamodb:us-east-1:{self.account_b}:table/table-in-new-account"
)