Techdebt: Replace sure with regular asserts in Core (#6507)
This commit is contained in:
parent
1b2dfbaf56
commit
e03d2eb024
@ -28,29 +28,29 @@ class TestAccountIdResolution:
|
||||
def test_environment_variable_takes_precedence(self):
|
||||
# Verify ACCOUNT ID is standard
|
||||
resp = self._get_caller_identity()
|
||||
self._get_account_id(resp).should.equal(ACCOUNT_ID)
|
||||
assert self._get_account_id(resp) == ACCOUNT_ID
|
||||
|
||||
# Specify environment variable, and verify this becomes the new ACCOUNT ID
|
||||
os.environ["MOTO_ACCOUNT_ID"] = "111122223333"
|
||||
resp = self._get_caller_identity()
|
||||
self._get_account_id(resp).should.equal("111122223333")
|
||||
assert self._get_account_id(resp) == "111122223333"
|
||||
|
||||
# Specify special request header - the environment variable should still take precedence
|
||||
resp = self._get_caller_identity(
|
||||
extra_headers={"x-moto-account-id": "333344445555"}
|
||||
)
|
||||
self._get_account_id(resp).should.equal("111122223333")
|
||||
assert self._get_account_id(resp) == "111122223333"
|
||||
|
||||
# Remove the environment variable - the Request Header should now take precedence
|
||||
del os.environ["MOTO_ACCOUNT_ID"]
|
||||
resp = self._get_caller_identity(
|
||||
extra_headers={"x-moto-account-id": "333344445555"}
|
||||
)
|
||||
self._get_account_id(resp).should.equal("333344445555")
|
||||
assert self._get_account_id(resp) == "333344445555"
|
||||
|
||||
# Without Header, we're back to the regular account ID
|
||||
resp = self._get_caller_identity()
|
||||
self._get_account_id(resp).should.equal(ACCOUNT_ID)
|
||||
assert self._get_account_id(resp) == ACCOUNT_ID
|
||||
|
||||
def _get_caller_identity(self, extra_headers=None):
|
||||
data = "Action=GetCallerIdentity&Version=2011-06-15"
|
||||
|
@ -1,7 +1,6 @@
|
||||
import json
|
||||
|
||||
import boto3
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
import pytest
|
||||
@ -179,11 +178,10 @@ def test_invalid_client_token_id():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.get_user()
|
||||
ex.value.response["Error"]["Code"].should.equal("InvalidClientTokenId")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
"The security token included in the request is invalid."
|
||||
)
|
||||
err = ex.value.response["Error"]
|
||||
assert err["Code"] == "InvalidClientTokenId"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert err["Message"] == "The security token included in the request is invalid."
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(0)
|
||||
@ -197,10 +195,11 @@ def test_auth_failure():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.describe_instances()
|
||||
ex.value.response["Error"]["Code"].should.equal("AuthFailure")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(401)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
"AWS was not able to validate the provided access credentials"
|
||||
err = ex.value.response["Error"]
|
||||
assert err["Code"] == "AuthFailure"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 401
|
||||
assert (
|
||||
err["Message"] == "AWS was not able to validate the provided access credentials"
|
||||
)
|
||||
|
||||
|
||||
@ -216,10 +215,11 @@ def test_signature_does_not_match():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.get_user()
|
||||
ex.value.response["Error"]["Code"].should.equal("SignatureDoesNotMatch")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."
|
||||
assert ex.value.response["Error"]["Code"] == "SignatureDoesNotMatch"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== "The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."
|
||||
)
|
||||
|
||||
|
||||
@ -235,10 +235,11 @@ def test_auth_failure_with_valid_access_key_id():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.describe_instances()
|
||||
ex.value.response["Error"]["Code"].should.equal("AuthFailure")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(401)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
"AWS was not able to validate the provided access credentials"
|
||||
assert ex.value.response["Error"]["Code"] == "AuthFailure"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 401
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== "AWS was not able to validate the provided access credentials"
|
||||
)
|
||||
|
||||
|
||||
@ -255,10 +256,11 @@ def test_access_denied_with_no_policy():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.describe_instances()
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
|
||||
)
|
||||
|
||||
|
||||
@ -281,10 +283,11 @@ def test_access_denied_with_not_allowing_policy():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.describe_instances()
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:DescribeInstances"
|
||||
)
|
||||
|
||||
|
||||
@ -313,10 +316,11 @@ def test_access_denied_for_run_instances():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.run_instances(MaxCount=1, MinCount=1)
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:RunInstances"
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:RunInstances"
|
||||
)
|
||||
|
||||
|
||||
@ -342,10 +346,11 @@ def test_access_denied_with_denying_policy():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_vpc(CidrBlock="10.0.0.0/16")
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateVpc"
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateVpc"
|
||||
)
|
||||
|
||||
|
||||
@ -368,7 +373,10 @@ def test_get_caller_identity_allowed_with_denying_policy():
|
||||
aws_access_key_id=access_key["AccessKeyId"],
|
||||
aws_secret_access_key=access_key["SecretAccessKey"],
|
||||
)
|
||||
client.get_caller_identity().should.be.a(dict)
|
||||
assert (
|
||||
client.get_caller_identity()["Arn"]
|
||||
== f"arn:aws:iam::{ACCOUNT_ID}:user/{user_name}"
|
||||
)
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(3)
|
||||
@ -388,7 +396,7 @@ def test_allowed_with_wildcard_action():
|
||||
aws_access_key_id=access_key["AccessKeyId"],
|
||||
aws_secret_access_key=access_key["SecretAccessKey"],
|
||||
)
|
||||
client.describe_tags()["Tags"].should.be.empty
|
||||
assert client.describe_tags()["Tags"] == []
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(4)
|
||||
@ -408,7 +416,7 @@ def test_allowed_with_explicit_action_in_attached_policy():
|
||||
aws_access_key_id=access_key["AccessKeyId"],
|
||||
aws_secret_access_key=access_key["SecretAccessKey"],
|
||||
)
|
||||
client.list_groups()["Groups"].should.be.empty
|
||||
assert client.list_groups()["Groups"] == []
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(8)
|
||||
@ -440,9 +448,9 @@ def test_s3_access_denied_with_denying_attached_group_policy():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.list_buckets()
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal("Access Denied")
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert ex.value.response["Error"]["Message"] == "Access Denied"
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(6)
|
||||
@ -474,9 +482,9 @@ def test_s3_access_denied_with_denying_inline_group_policy():
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.get_object(Bucket=bucket_name, Key="sdfsdf")
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal("Access Denied")
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert ex.value.response["Error"]["Message"] == "Access Denied"
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(10)
|
||||
@ -520,10 +528,11 @@ def test_access_denied_with_many_irrelevant_policies():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_key_pair(KeyName="TestKey")
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateKeyPair"
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: ec2:CreateKeyPair"
|
||||
)
|
||||
|
||||
|
||||
@ -573,11 +582,10 @@ def test_allowed_with_temporary_credentials():
|
||||
aws_session_token=credentials["SessionToken"],
|
||||
)
|
||||
subnets = ec2_client.describe_subnets()["Subnets"]
|
||||
len(subnets).should.be.greater_than(1)
|
||||
elbv2_client.create_load_balancer(
|
||||
Name="test-load-balancer",
|
||||
Subnets=[subnets[0]["SubnetId"], subnets[1]["SubnetId"]],
|
||||
)["LoadBalancers"].should.have.length_of(1)
|
||||
assert len(subnets) > 1
|
||||
subnet_ids = [subnets[0]["SubnetId"], subnets[1]["SubnetId"]]
|
||||
resp = elbv2_client.create_load_balancer(Name="lb", Subnets=subnet_ids)
|
||||
assert len(resp["LoadBalancers"]) == 1
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(3)
|
||||
@ -617,10 +625,11 @@ def test_access_denied_with_temporary_credentials():
|
||||
DBInstanceClass="db.t3",
|
||||
Engine="aurora-postgresql",
|
||||
)
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
f"User: arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name} is not authorized to perform: rds:CreateDBInstance"
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== f"User: arn:aws:sts::{ACCOUNT_ID}:assumed-role/{role_name}/{session_name} is not authorized to perform: rds:CreateDBInstance"
|
||||
)
|
||||
|
||||
|
||||
@ -641,7 +650,7 @@ def test_get_user_from_credentials():
|
||||
aws_access_key_id=access_key["AccessKeyId"],
|
||||
aws_secret_access_key=access_key["SecretAccessKey"],
|
||||
)
|
||||
client.get_user()["User"]["UserName"].should.equal(user_name)
|
||||
assert client.get_user()["User"]["UserName"] == user_name
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(0)
|
||||
@ -655,10 +664,11 @@ def test_s3_invalid_access_key_id():
|
||||
)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.list_buckets()
|
||||
ex.value.response["Error"]["Code"].should.equal("InvalidAccessKeyId")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
"The AWS Access Key Id you provided does not exist in our records."
|
||||
assert ex.value.response["Error"]["Code"] == "InvalidAccessKeyId"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== "The AWS Access Key Id you provided does not exist in our records."
|
||||
)
|
||||
|
||||
|
||||
@ -677,10 +687,11 @@ def test_s3_signature_does_not_match():
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.put_object(Bucket=bucket_name, Key="abc")
|
||||
ex.value.response["Error"]["Code"].should.equal("SignatureDoesNotMatch")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
"The request signature we calculated does not match the signature you provided. Check your key and signing method."
|
||||
assert ex.value.response["Error"]["Code"] == "SignatureDoesNotMatch"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== "The request signature we calculated does not match the signature you provided. Check your key and signing method."
|
||||
)
|
||||
|
||||
|
||||
@ -713,9 +724,9 @@ def test_s3_access_denied_not_action():
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.delete_object(Bucket=bucket_name, Key="sdfsdf")
|
||||
ex.value.response["Error"]["Code"].should.equal("AccessDenied")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(403)
|
||||
ex.value.response["Error"]["Message"].should.equal("Access Denied")
|
||||
assert ex.value.response["Error"]["Code"] == "AccessDenied"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403
|
||||
assert ex.value.response["Error"]["Message"] == "Access Denied"
|
||||
|
||||
|
||||
@set_initial_no_auth_action_count(4)
|
||||
@ -751,8 +762,7 @@ def test_s3_invalid_token_with_temporary_credentials():
|
||||
client.create_bucket(Bucket=bucket_name)
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.list_bucket_metrics_configurations(Bucket=bucket_name)
|
||||
ex.value.response["Error"]["Code"].should.equal("InvalidToken")
|
||||
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
"The provided token is malformed or otherwise invalid."
|
||||
)
|
||||
err = ex.value.response["Error"]
|
||||
assert err["Code"] == "InvalidToken"
|
||||
assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
|
||||
assert err["Message"] == "The provided token is malformed or otherwise invalid."
|
||||
|
@ -19,28 +19,17 @@ class ExampleBackend(BaseBackend):
|
||||
|
||||
def test_backend_dict_returns_nothing_by_default():
|
||||
backend_dict = BackendDict(ExampleBackend, "ebs")
|
||||
list(backend_dict.items()).should.equal([])
|
||||
assert list(backend_dict.items()) == []
|
||||
|
||||
|
||||
def test_account_specific_dict_contains_known_regions():
|
||||
backend_dict = BackendDict(ExampleBackend, "ec2")
|
||||
backend_dict["account"].should.have.key("eu-north-1")
|
||||
backend_dict["account"]["eu-north-1"].should.be.a(ExampleBackend)
|
||||
|
||||
|
||||
def test_backend_dict_known_regions_can_be_retrieved_directly():
|
||||
backend_dict = BackendDict(ExampleBackend, "ec2")
|
||||
backend_dict["account"]["eu-west-1"].should.be.a(ExampleBackend)
|
||||
|
||||
|
||||
def test_backend_dict_can_get_known_regions():
|
||||
backend_dict = BackendDict(ExampleBackend, "ec2")["12345"]
|
||||
backend_dict["us-east-1"].should.be.a(ExampleBackend)
|
||||
assert isinstance(backend_dict["account"]["eu-north-1"], ExampleBackend)
|
||||
|
||||
|
||||
def test_backend_dict_does_not_contain_unknown_regions():
|
||||
backend_dict = BackendDict(ExampleBackend, "ec2")
|
||||
backend_dict["account"].shouldnt.have.key("mars-south-1")
|
||||
assert "mars-south-1" not in backend_dict["account"]
|
||||
|
||||
|
||||
def test_backend_dict_fails_when_retrieving_unknown_regions():
|
||||
@ -53,35 +42,35 @@ def test_backend_dict_can_retrieve_for_specific_account():
|
||||
backend_dict = BackendDict(ExampleBackend, "ec2")
|
||||
|
||||
# Random account does not exist
|
||||
backend_dict.shouldnt.have.key("000000")
|
||||
assert "000000" not in backend_dict
|
||||
|
||||
# Retrieve AccountSpecificBackend by assuming it exists
|
||||
backend = backend_dict["012345"]
|
||||
backend.should.be.a(AccountSpecificBackend)
|
||||
assert isinstance(backend, AccountSpecificBackend)
|
||||
|
||||
backend.should.have.key("eu-north-1")
|
||||
assert "eu-north-1" in backend
|
||||
regional_backend = backend["eu-north-1"]
|
||||
regional_backend.should.be.a(ExampleBackend)
|
||||
regional_backend.region_name.should.equal("eu-north-1")
|
||||
assert isinstance(regional_backend, ExampleBackend)
|
||||
assert regional_backend.region_name == "eu-north-1"
|
||||
# We always return a fixed account_id for now, until we have proper multi-account support
|
||||
regional_backend.account_id.should.equal("012345")
|
||||
assert regional_backend.account_id == "012345"
|
||||
|
||||
|
||||
def test_backend_dict_can_ignore_boto3_regions():
|
||||
backend_dict = BackendDict(ExampleBackend, "ec2", use_boto3_regions=False)
|
||||
backend_dict["account"].get("us-east-1").should.equal(None)
|
||||
assert backend_dict["account"].get("us-east-1") is None
|
||||
|
||||
|
||||
def test_backend_dict_can_specify_additional_regions():
|
||||
backend_dict = BackendDict(
|
||||
ExampleBackend, "ec2", additional_regions=["region1", "global"]
|
||||
)["123456"]
|
||||
backend_dict["us-east-1"].should.be.a(ExampleBackend)
|
||||
backend_dict["region1"].should.be.a(ExampleBackend)
|
||||
backend_dict["global"].should.be.a(ExampleBackend)
|
||||
assert isinstance(backend_dict["us-east-1"], ExampleBackend)
|
||||
assert isinstance(backend_dict["region1"], ExampleBackend)
|
||||
assert isinstance(backend_dict["global"], ExampleBackend)
|
||||
|
||||
# Unknown regions still do not exist
|
||||
backend_dict.get("us-east-3").should.equal(None)
|
||||
assert backend_dict.get("us-east-3") is None
|
||||
|
||||
|
||||
class TestMultiThreadedAccess:
|
||||
@ -128,7 +117,7 @@ class TestMultiThreadedAccess:
|
||||
for x in threads:
|
||||
x.join()
|
||||
|
||||
self.backend["123456789012"]["us-east-1"].data.should.have.length_of(15)
|
||||
assert len(self.backend["123456789012"]["us-east-1"].data) == 15
|
||||
|
||||
|
||||
def test_backend_dict_can_be_hashed():
|
||||
@ -136,7 +125,7 @@ def test_backend_dict_can_be_hashed():
|
||||
for backend in [ExampleBackend, set, list, BaseBackend]:
|
||||
hashes.append(BackendDict(backend, "n/a").__hash__())
|
||||
# Hash is different for different backends
|
||||
set(hashes).should.have.length_of(4)
|
||||
assert len(set(hashes)) == 4
|
||||
|
||||
|
||||
def test_account_specific_dict_can_be_hashed():
|
||||
@ -146,7 +135,7 @@ def test_account_specific_dict_can_be_hashed():
|
||||
asb = _create_asb(accnt_id)
|
||||
hashes.append(asb.__hash__())
|
||||
# Hash is different for different accounts
|
||||
set(hashes).should.have.length_of(5)
|
||||
assert len(set(hashes)) == 5
|
||||
|
||||
|
||||
def _create_asb(account_id, backend=None, use_boto3_regions=False, regions=None):
|
||||
@ -160,7 +149,6 @@ def _create_asb(account_id, backend=None, use_boto3_regions=False, regions=None)
|
||||
|
||||
|
||||
def test_multiple_backends_cache_behaviour():
|
||||
|
||||
ec2 = BackendDict(EC2Backend, "ec2")
|
||||
ec2_useast1 = ec2[DEFAULT_ACCOUNT_ID]["us-east-1"]
|
||||
assert type(ec2_useast1) == EC2Backend
|
||||
|
@ -1,4 +1,3 @@
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
import boto3
|
||||
from moto import mock_sqs, settings
|
||||
from tests import DEFAULT_ACCOUNT_ID
|
||||
@ -11,4 +10,4 @@ def test_context_manager_returns_mock():
|
||||
|
||||
if not settings.TEST_SERVER_MODE:
|
||||
backend = sqs_mock.backends[DEFAULT_ACCOUNT_ID]["us-west-1"]
|
||||
list(backend.queues.keys()).should.equal(["queue1"])
|
||||
assert list(backend.queues.keys()) == ["queue1"]
|
||||
|
@ -199,7 +199,7 @@ class TestWithPublicMethod(unittest.TestCase):
|
||||
self.ensure_bucket_exists()
|
||||
|
||||
s3 = boto3.client("s3", region_name="us-east-1")
|
||||
s3.head_bucket(Bucket="mybucket").shouldnt.equal(None)
|
||||
assert s3.head_bucket(Bucket="mybucket") is not None
|
||||
|
||||
def test_should_not_find_bucket(self) -> None:
|
||||
s3 = boto3.client("s3", region_name="us-east-1")
|
||||
@ -216,7 +216,7 @@ class TestWithPseudoPrivateMethod(unittest.TestCase):
|
||||
def test_should_find_bucket(self) -> None:
|
||||
self._ensure_bucket_exists()
|
||||
s3 = boto3.client("s3", region_name="us-east-1")
|
||||
s3.head_bucket(Bucket="mybucket").shouldnt.equal(None)
|
||||
assert s3.head_bucket(Bucket="mybucket") is not None
|
||||
|
||||
def test_should_not_find_bucket(self) -> None:
|
||||
s3 = boto3.client("s3", region_name="us-east-1")
|
||||
@ -241,7 +241,7 @@ class Baseclass(unittest.TestCase):
|
||||
class TestSetUpInBaseClass(Baseclass):
|
||||
def test_a_thing(self) -> None:
|
||||
# Verify that we can 'see' the setUp-method in the parent class
|
||||
self.client.head_bucket(Bucket="testbucket").shouldnt.equal(None)
|
||||
assert self.client.head_bucket(Bucket="testbucket") is not None
|
||||
|
||||
|
||||
@mock_s3
|
||||
|
@ -1,5 +1,4 @@
|
||||
import os
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
from moto import mock_ec2, mock_s3
|
||||
|
||||
KEY = "AWS_ACCESS_KEY_ID"
|
||||
@ -8,7 +7,7 @@ KEY = "AWS_ACCESS_KEY_ID"
|
||||
def test_aws_keys_are_patched():
|
||||
with mock_ec2():
|
||||
patched_value = os.environ[KEY]
|
||||
patched_value.should.equal("foobar_key")
|
||||
assert patched_value == "foobar_key"
|
||||
|
||||
|
||||
def test_aws_keys_can_be_none():
|
||||
@ -26,7 +25,7 @@ def test_aws_keys_can_be_none():
|
||||
# Verify that the os.environ[KEY] is patched
|
||||
with mock_s3():
|
||||
patched_value = os.environ[KEY]
|
||||
patched_value.should.equal("foobar_key")
|
||||
assert patched_value == "foobar_key"
|
||||
# Verify that the os.environ[KEY] is unpatched, and reverts to None
|
||||
assert os.environ.get(KEY) is None
|
||||
finally:
|
||||
|
@ -1,6 +1,5 @@
|
||||
import boto3
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
from moto import mock_s3
|
||||
from moto import settings
|
||||
from unittest import SkipTest
|
||||
@ -25,7 +24,7 @@ def test_mock_works_with_client_created_inside(
|
||||
client = boto3.client("s3", region_name="us-east-1")
|
||||
|
||||
b = client.list_buckets()
|
||||
b["Buckets"].should.equal([])
|
||||
assert b["Buckets"] == []
|
||||
m.stop()
|
||||
|
||||
|
||||
@ -45,7 +44,7 @@ def test_mock_works_with_client_created_outside(
|
||||
patch_client(outside_client)
|
||||
|
||||
b = outside_client.list_buckets()
|
||||
b["Buckets"].should.equal([])
|
||||
assert b["Buckets"] == []
|
||||
m.stop()
|
||||
|
||||
|
||||
@ -65,7 +64,7 @@ def test_mock_works_with_resource_created_outside(
|
||||
patch_resource(outside_resource)
|
||||
|
||||
b = list(outside_resource.buckets.all())
|
||||
b.should.equal([])
|
||||
assert b == []
|
||||
m.stop()
|
||||
|
||||
|
||||
@ -118,7 +117,6 @@ class ImportantBusinessLogic:
|
||||
def test_mock_works_when_replacing_client(
|
||||
aws_credentials,
|
||||
): # pylint: disable=unused-argument
|
||||
|
||||
logic = ImportantBusinessLogic()
|
||||
|
||||
m = mock_s3()
|
||||
@ -133,6 +131,6 @@ def test_mock_works_when_replacing_client(
|
||||
client_initialized_after_mock = boto3.client("s3", region_name="us-east-1")
|
||||
logic._s3 = client_initialized_after_mock
|
||||
# This will work, as we now use a properly mocked client
|
||||
logic.do_important_things().should.equal([])
|
||||
assert logic.do_important_things() == []
|
||||
|
||||
m.stop()
|
||||
|
@ -1,4 +1,3 @@
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
import requests
|
||||
|
||||
from moto import mock_ec2, settings
|
||||
@ -12,7 +11,7 @@ else:
|
||||
@mock_ec2
|
||||
def test_latest_meta_data():
|
||||
res = requests.get(f"{BASE_URL}/latest/meta-data/")
|
||||
res.content.should.equal(b"iam")
|
||||
assert res.content == b"iam"
|
||||
|
||||
|
||||
@mock_ec2
|
||||
@ -20,16 +19,16 @@ def test_meta_data_iam():
|
||||
res = requests.get(f"{BASE_URL}/latest/meta-data/iam")
|
||||
json_response = res.json()
|
||||
default_role = json_response["security-credentials"]["default-role"]
|
||||
default_role.should.contain("AccessKeyId")
|
||||
default_role.should.contain("SecretAccessKey")
|
||||
default_role.should.contain("Token")
|
||||
default_role.should.contain("Expiration")
|
||||
assert "AccessKeyId" in default_role
|
||||
assert "SecretAccessKey" in default_role
|
||||
assert "Token" in default_role
|
||||
assert "Expiration" in default_role
|
||||
|
||||
|
||||
@mock_ec2
|
||||
def test_meta_data_security_credentials():
|
||||
res = requests.get(f"{BASE_URL}/latest/meta-data/iam/security-credentials/")
|
||||
res.content.should.equal(b"default-role")
|
||||
assert res.content == b"default-role"
|
||||
|
||||
|
||||
@mock_ec2
|
||||
@ -38,7 +37,7 @@ def test_meta_data_default_role():
|
||||
f"{BASE_URL}/latest/meta-data/iam/security-credentials/default-role"
|
||||
)
|
||||
json_response = res.json()
|
||||
json_response.should.contain("AccessKeyId")
|
||||
json_response.should.contain("SecretAccessKey")
|
||||
json_response.should.contain("Token")
|
||||
json_response.should.contain("Expiration")
|
||||
assert "AccessKeyId" in json_response
|
||||
assert "SecretAccessKey" in json_response
|
||||
assert "Token" in json_response
|
||||
assert "Expiration" in json_response
|
||||
|
@ -1,6 +1,5 @@
|
||||
import boto3
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from moto import mock_all
|
||||
|
||||
@ -10,15 +9,15 @@ def test_decorator() -> None:
|
||||
rgn = "us-east-1"
|
||||
sqs = boto3.client("sqs", region_name=rgn)
|
||||
r = sqs.list_queues()
|
||||
r["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
|
||||
assert r["ResponseMetadata"]["HTTPStatusCode"] == 200
|
||||
|
||||
lmbda = boto3.client("lambda", region_name=rgn)
|
||||
r = lmbda.list_event_source_mappings()
|
||||
r["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
|
||||
assert r["ResponseMetadata"]["HTTPStatusCode"] == 200
|
||||
|
||||
ddb = boto3.client("dynamodb", region_name=rgn)
|
||||
r = ddb.list_tables()
|
||||
r["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
|
||||
assert r["ResponseMetadata"]["HTTPStatusCode"] == 200
|
||||
|
||||
|
||||
def test_context_manager() -> None:
|
||||
@ -27,7 +26,7 @@ def test_context_manager() -> None:
|
||||
with mock_all():
|
||||
sqs = boto3.client("sqs", region_name=rgn)
|
||||
r = sqs.list_queues()
|
||||
r["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
|
||||
assert r["ResponseMetadata"]["HTTPStatusCode"] == 200
|
||||
|
||||
unpatched_sqs = boto3.Session().client("sqs", region_name=rgn)
|
||||
|
||||
|
@ -13,14 +13,14 @@ def test_use_invalid_region():
|
||||
client = boto3.client("sns", region_name="any-region")
|
||||
with pytest.raises(KeyError) as exc:
|
||||
client.list_platform_applications()
|
||||
str(exc.value).should.contain("any-region")
|
||||
assert "any-region" in str(exc.value)
|
||||
|
||||
|
||||
@mock_sns
|
||||
@mock.patch.dict(os.environ, {"AWS_DEFAULT_REGION": "us-east-2"})
|
||||
def test_use_region_from_env():
|
||||
client = boto3.client("sns")
|
||||
client.list_platform_applications()["PlatformApplications"].should.equal([])
|
||||
assert client.list_platform_applications()["PlatformApplications"] == []
|
||||
|
||||
|
||||
@mock_sns
|
||||
@ -31,7 +31,7 @@ def test_use_unknown_region_from_env():
|
||||
client = boto3.client("sns")
|
||||
with pytest.raises(KeyError) as exc:
|
||||
client.list_platform_applications()
|
||||
str(exc.value).should.contain("any-region")
|
||||
assert "any-region" in str(exc.value)
|
||||
|
||||
|
||||
@mock_sns
|
||||
@ -41,7 +41,7 @@ def test_use_unknown_region_from_env_but_allow_it():
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("Cannot set environemnt variables in ServerMode")
|
||||
client = boto3.client("sns")
|
||||
client.list_platform_applications()["PlatformApplications"].should.equal([])
|
||||
assert client.list_platform_applications()["PlatformApplications"] == []
|
||||
|
||||
|
||||
@mock_dynamodb
|
||||
@ -57,5 +57,4 @@ def test_use_unknown_region_from_env_but_allow_it__dynamo():
|
||||
BillingMode="PAY_PER_REQUEST",
|
||||
)
|
||||
tables = list(dynamo_db.tables.all())
|
||||
tables.should.have.length_of(1)
|
||||
[table.name for table in tables].should.equal(["test_table"])
|
||||
assert [table.name for table in tables] == ["test_table"]
|
||||
|
@ -1,4 +1,3 @@
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
import requests
|
||||
|
||||
import boto3
|
||||
@ -21,12 +20,12 @@ data_url = f"{base_url}/moto-api/data.json"
|
||||
def test_reset_api():
|
||||
conn = boto3.client("sqs", region_name="us-west-1")
|
||||
conn.create_queue(QueueName="queue1")
|
||||
conn.list_queues()["QueueUrls"].should.have.length_of(1)
|
||||
assert len(conn.list_queues()["QueueUrls"]) == 1
|
||||
|
||||
res = requests.post(f"{base_url}/moto-api/reset")
|
||||
res.content.should.equal(b'{"status": "ok"}')
|
||||
assert res.content == b'{"status": "ok"}'
|
||||
|
||||
conn.list_queues().shouldnt.contain("QueueUrls") # No more queues
|
||||
assert "QueueUrls" not in conn.list_queues() # No more queues
|
||||
|
||||
|
||||
@mock_sqs
|
||||
@ -35,9 +34,9 @@ def test_data_api():
|
||||
conn.create_queue(QueueName="queue1")
|
||||
|
||||
queues = requests.post(data_url).json()["sqs"]["Queue"]
|
||||
len(queues).should.equal(1)
|
||||
assert len(queues) == 1
|
||||
queue = queues[0]
|
||||
queue["name"].should.equal("queue1")
|
||||
assert queue["name"] == "queue1"
|
||||
|
||||
|
||||
@mock_s3
|
||||
@ -81,11 +80,10 @@ def test_creation_error__data_api_still_returns_thing():
|
||||
_, _, x = response_instance.model_data(None, None, None)
|
||||
|
||||
as_objects = json.loads(x)["autoscaling"]
|
||||
as_objects.should.have.key("FakeAutoScalingGroup")
|
||||
assert len(as_objects["FakeAutoScalingGroup"]) >= 1
|
||||
|
||||
names = [obj["name"] for obj in as_objects["FakeAutoScalingGroup"]]
|
||||
names.should.contain("test_asg")
|
||||
assert "test_asg" in names
|
||||
|
||||
|
||||
def test_model_data_is_emptied_as_necessary():
|
||||
@ -98,24 +96,24 @@ def test_model_data_is_emptied_as_necessary():
|
||||
# No instances exist, because we have just reset it
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
assert _class.instances == []
|
||||
|
||||
with mock_sqs():
|
||||
# When just starting a mock, it is empty
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
assert _class.instances == []
|
||||
|
||||
# After creating a queue, some data will be present
|
||||
conn = boto3.client("sqs", region_name="us-west-1")
|
||||
conn.create_queue(QueueName="queue1")
|
||||
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
assert len(model_data["sqs"]["Queue"].instances) == 1
|
||||
|
||||
# But after the mock ends, it is empty again
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
assert _class.instances == []
|
||||
|
||||
# When we have multiple/nested mocks, the data should still be present after the first mock ends
|
||||
with mock_sqs():
|
||||
@ -123,9 +121,9 @@ def test_model_data_is_emptied_as_necessary():
|
||||
conn.create_queue(QueueName="queue1")
|
||||
with mock_s3():
|
||||
# The data should still be here - instances should not reset if another mock is still active
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
assert len(model_data["sqs"]["Queue"].instances) == 1
|
||||
# The data should still be here - the inner mock has exited, but the outer mock is still active
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
assert len(model_data["sqs"]["Queue"].instances) == 1
|
||||
|
||||
|
||||
@mock_sqs
|
||||
@ -137,10 +135,10 @@ class TestModelDataResetForClassDecorator(TestCase):
|
||||
# No data is present at the beginning
|
||||
for classes_per_service in model_data.values():
|
||||
for _class in classes_per_service.values():
|
||||
_class.instances.should.equal([])
|
||||
assert _class.instances == []
|
||||
|
||||
conn = boto3.client("sqs", region_name="us-west-1")
|
||||
conn.create_queue(QueueName="queue1")
|
||||
|
||||
def test_should_find_bucket(self):
|
||||
model_data["sqs"]["Queue"].instances.should.have.length_of(1)
|
||||
assert len(model_data["sqs"]["Queue"].instances) == 1
|
||||
|
@ -1,7 +1,5 @@
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
import unittest
|
||||
|
||||
import boto3
|
||||
import unittest
|
||||
|
||||
from moto import mock_sqs, mock_ec2
|
||||
from tests import EXAMPLE_AMI_ID
|
||||
@ -16,7 +14,7 @@ class TestNestedDecoratorsBoto3(unittest.TestCase):
|
||||
queue.send_message(MessageBody="test message 1")
|
||||
|
||||
queue.reload()
|
||||
queue.attributes["ApproximateNumberOfMessages"].should.equal("1")
|
||||
assert queue.attributes["ApproximateNumberOfMessages"] == "1"
|
||||
|
||||
@mock_ec2
|
||||
def test_nested(self):
|
||||
|
@ -1,6 +1,5 @@
|
||||
import requests
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
import boto3
|
||||
from moto import mock_s3, mock_sts, mock_sqs, settings
|
||||
@ -49,4 +48,4 @@ def test_decorator_ordering() -> None:
|
||||
)
|
||||
|
||||
resp = requests.get(presigned_url)
|
||||
resp.status_code.should.equal(200) # type: ignore[attr-defined]
|
||||
assert resp.status_code == 200 # type: ignore[attr-defined]
|
||||
|
@ -1,6 +1,5 @@
|
||||
import datetime
|
||||
from unittest import SkipTest, mock
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
@ -46,17 +45,21 @@ def test_flatten_json_request_body():
|
||||
}
|
||||
|
||||
flat = flatten_json_request_body("", body, spec)
|
||||
flat["Name"].should.equal(body["Name"])
|
||||
flat["Instances.Ec2KeyName"].should.equal(body["Instances"]["Ec2KeyName"])
|
||||
assert flat["Name"] == body["Name"]
|
||||
assert flat["Instances.Ec2KeyName"] == body["Instances"]["Ec2KeyName"]
|
||||
for idx in range(2):
|
||||
flat[
|
||||
"Instances.InstanceGroups.member." + str(idx + 1) + ".InstanceRole"
|
||||
].should.equal(body["Instances"]["InstanceGroups"][idx]["InstanceRole"])
|
||||
flat[
|
||||
"Instances.InstanceGroups.member." + str(idx + 1) + ".InstanceType"
|
||||
].should.equal(body["Instances"]["InstanceGroups"][idx]["InstanceType"])
|
||||
flat["Instances.Placement.AvailabilityZone"].should.equal(
|
||||
body["Instances"]["Placement"]["AvailabilityZone"]
|
||||
inst = body["Instances"]["InstanceGroups"][idx]
|
||||
assert (
|
||||
flat[f"Instances.InstanceGroups.member.{(idx + 1)}.InstanceRole"]
|
||||
== inst["InstanceRole"]
|
||||
)
|
||||
assert (
|
||||
flat[f"Instances.InstanceGroups.member.{(idx + 1)}.InstanceType"]
|
||||
== inst["InstanceType"]
|
||||
)
|
||||
assert (
|
||||
flat["Instances.Placement.AvailabilityZone"]
|
||||
== body["Instances"]["Placement"]["AvailabilityZone"]
|
||||
)
|
||||
|
||||
for idx in range(1):
|
||||
@ -64,21 +67,19 @@ def test_flatten_json_request_body():
|
||||
step = body["Steps"][idx]["HadoopJarStep"]
|
||||
i = 0
|
||||
while prefix + ".Properties.member." + str(i + 1) + ".Key" in flat:
|
||||
flat[prefix + ".Properties.member." + str(i + 1) + ".Key"].should.equal(
|
||||
step["Properties"][i]["Key"]
|
||||
)
|
||||
flat[prefix + ".Properties.member." + str(i + 1) + ".Value"].should.equal(
|
||||
step["Properties"][i]["Value"]
|
||||
)
|
||||
prop = step["Properties"][i]
|
||||
assert flat[f"{prefix}.Properties.member.{(i + 1)}.Key"] == prop["Key"]
|
||||
assert flat[f"{prefix}.Properties.member.{(i + 1)}.Value"] == prop["Value"]
|
||||
i += 1
|
||||
i = 0
|
||||
while prefix + ".Args.member." + str(i + 1) in flat:
|
||||
flat[prefix + ".Args.member." + str(i + 1)].should.equal(step["Args"][i])
|
||||
assert flat[f"{prefix}.Args.member.{(i + 1)}"] == step["Args"][i]
|
||||
i += 1
|
||||
|
||||
for idx in range(2):
|
||||
flat["Configurations.member." + str(idx + 1) + ".Classification"].should.equal(
|
||||
body["Configurations"][idx]["Classification"]
|
||||
assert (
|
||||
flat["Configurations.member." + str(idx + 1) + ".Classification"]
|
||||
== body["Configurations"][idx]["Classification"]
|
||||
)
|
||||
|
||||
props = {}
|
||||
@ -89,7 +90,7 @@ def test_flatten_json_request_body():
|
||||
props[flat[key + ".key"]] = flat[key + ".value"]
|
||||
i += 1
|
||||
key = keyfmt.format(idx + 1, i)
|
||||
props.should.equal(body["Configurations"][idx]["Properties"])
|
||||
assert props == body["Configurations"][idx]["Properties"]
|
||||
|
||||
|
||||
def test_parse_qs_unicode_decode_error():
|
||||
@ -123,32 +124,30 @@ def test_get_params():
|
||||
|
||||
result = subject._get_params()
|
||||
|
||||
result.should.equal(
|
||||
{
|
||||
"Action": "CreateRule",
|
||||
"Version": "2015-12-01",
|
||||
"ListenerArn": "arn:aws:elasticloadbalancing:us-east-1:1:listener/my-lb/50dc6c495c0c9188/80139731473870416",
|
||||
"Priority": "100",
|
||||
"Conditions": [
|
||||
{
|
||||
"Field": "http-header",
|
||||
"HttpHeaderConfig": {
|
||||
"HttpHeaderName": "User-Agent",
|
||||
"Values": ["Mozilla", "curl"],
|
||||
},
|
||||
}
|
||||
],
|
||||
"Actions": [
|
||||
{
|
||||
"Type": "fixed-response",
|
||||
"FixedResponseConfig": {
|
||||
"StatusCode": "200",
|
||||
"ContentType": "text/plain",
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
assert result == {
|
||||
"Action": "CreateRule",
|
||||
"Version": "2015-12-01",
|
||||
"ListenerArn": "arn:aws:elasticloadbalancing:us-east-1:1:listener/my-lb/50dc6c495c0c9188/80139731473870416",
|
||||
"Priority": "100",
|
||||
"Conditions": [
|
||||
{
|
||||
"Field": "http-header",
|
||||
"HttpHeaderConfig": {
|
||||
"HttpHeaderName": "User-Agent",
|
||||
"Values": ["Mozilla", "curl"],
|
||||
},
|
||||
}
|
||||
],
|
||||
"Actions": [
|
||||
{
|
||||
"Type": "fixed-response",
|
||||
"FixedResponseConfig": {
|
||||
"StatusCode": "200",
|
||||
"ContentType": "text/plain",
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def test_get_dict_list_params():
|
||||
@ -166,7 +165,7 @@ def test_get_dict_list_params():
|
||||
# TODO: extend test and logic such that we can call subject._get_params() directly here
|
||||
result = subject._get_multi_param_dict("VpcSecurityGroupIds")
|
||||
|
||||
result.should.equal({"VpcSecurityGroupId": ["sg-123", "sg-456", "sg-789"]})
|
||||
assert result == {"VpcSecurityGroupId": ["sg-123", "sg-456", "sg-789"]}
|
||||
|
||||
|
||||
def test_response_environment_preserved_by_type():
|
||||
|
@ -17,7 +17,6 @@ class TestResponsesModule(TestCase):
|
||||
@mock_s3
|
||||
@responses.activate
|
||||
def test_moto_first(self):
|
||||
|
||||
"""
|
||||
Verify we can activate a user-defined `responses` on top of our Moto mocks
|
||||
"""
|
||||
|
@ -1,5 +1,3 @@
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from unittest.mock import patch
|
||||
from moto.server import main, create_backend_app, DomainDispatcherApplication
|
||||
|
||||
@ -19,16 +17,16 @@ def test_wrong_arguments():
|
||||
def test_right_arguments(run_simple):
|
||||
main(["s3"])
|
||||
func_call = run_simple.call_args[0]
|
||||
func_call[0].should.equal("127.0.0.1")
|
||||
func_call[1].should.equal(5000)
|
||||
assert func_call[0] == "127.0.0.1"
|
||||
assert func_call[1] == 5000
|
||||
|
||||
|
||||
@patch("moto.server.run_simple")
|
||||
def test_port_argument(run_simple):
|
||||
main(["s3", "--port", "8080"])
|
||||
func_call = run_simple.call_args[0]
|
||||
func_call[0].should.equal("127.0.0.1")
|
||||
func_call[1].should.equal(8080)
|
||||
assert func_call[0] == "127.0.0.1"
|
||||
assert func_call[1] == 8080
|
||||
|
||||
|
||||
def test_domain_dispatched():
|
||||
@ -37,7 +35,7 @@ def test_domain_dispatched():
|
||||
{"HTTP_HOST": "email.us-east1.amazonaws.com"}
|
||||
)
|
||||
keys = list(backend_app.view_functions.keys())
|
||||
keys[0].should.equal("EmailResponse.dispatch")
|
||||
assert keys[0] == "EmailResponse.dispatch"
|
||||
|
||||
|
||||
def test_domain_dispatched_with_service():
|
||||
@ -45,4 +43,4 @@ def test_domain_dispatched_with_service():
|
||||
dispatcher = DomainDispatcherApplication(create_backend_app, service="s3")
|
||||
backend_app = dispatcher.get_application({"HTTP_HOST": "s3.us-east1.amazonaws.com"})
|
||||
keys = set(backend_app.view_functions.keys())
|
||||
keys.should.contain("S3Response.key_response")
|
||||
assert "S3Response.key_response" in keys
|
||||
|
@ -1,7 +1,6 @@
|
||||
import os
|
||||
from unittest import mock
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from moto import settings
|
||||
|
||||
@ -12,16 +11,16 @@ Sanity checks for interpretation of the MOTO_ECS_NEW_ARN-variable
|
||||
|
||||
|
||||
def test_default_is_true():
|
||||
settings.ecs_new_arn_format().should.equal(True)
|
||||
assert settings.ecs_new_arn_format() is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", ["TrUe", "true", "invalid", "0", "1"])
|
||||
def test_anything_but_false_is_true(value):
|
||||
with mock.patch.dict(os.environ, {"MOTO_ECS_NEW_ARN": value}):
|
||||
settings.ecs_new_arn_format().should.equal(True)
|
||||
assert settings.ecs_new_arn_format() is True
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value", ["False", "false", "faLse"])
|
||||
def test_only_false_is_false(value):
|
||||
with mock.patch.dict(os.environ, {"MOTO_ECS_NEW_ARN": value}):
|
||||
settings.ecs_new_arn_format().should.equal(False)
|
||||
assert settings.ecs_new_arn_format() is False
|
||||
|
@ -18,7 +18,6 @@ class TestMockBucketStartingWithServiceName:
|
||||
|
||||
@pytest.mark.parametrize("service_name,decorator", service_names)
|
||||
def test_bucketname_starting_with_service_name(self, service_name, decorator):
|
||||
|
||||
decorator = getattr(moto, f"mock_{service_name}")
|
||||
with decorator():
|
||||
with mock_s3():
|
||||
|
@ -1,22 +1,22 @@
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
from moto.core.utils import convert_regex_to_flask_path
|
||||
|
||||
|
||||
def test_flask_path_converting_simple():
|
||||
convert_regex_to_flask_path("/").should.equal("/")
|
||||
convert_regex_to_flask_path("/$").should.equal("/")
|
||||
assert convert_regex_to_flask_path("/") == "/"
|
||||
assert convert_regex_to_flask_path("/$") == "/"
|
||||
|
||||
convert_regex_to_flask_path("/foo").should.equal("/foo")
|
||||
assert convert_regex_to_flask_path("/foo") == "/foo"
|
||||
|
||||
convert_regex_to_flask_path("/foo/bar/").should.equal("/foo/bar/")
|
||||
assert convert_regex_to_flask_path("/foo/bar/") == "/foo/bar/"
|
||||
|
||||
|
||||
def test_flask_path_converting_regex():
|
||||
convert_regex_to_flask_path(r"/(?P<key_name>[a-zA-Z0-9\-_]+)").should.equal(
|
||||
r'/<regex("[a-zA-Z0-9\-_]+"):key_name>'
|
||||
assert (
|
||||
convert_regex_to_flask_path(r"/(?P<key_name>[a-zA-Z0-9\-_]+)")
|
||||
== r'/<regex("[a-zA-Z0-9\-_]+"):key_name>'
|
||||
)
|
||||
|
||||
convert_regex_to_flask_path(
|
||||
r"(?P<account_id>\d+)/(?P<queue_name>.*)$"
|
||||
).should.equal(r'<regex("\d+"):account_id>/<regex(".*"):queue_name>')
|
||||
assert (
|
||||
convert_regex_to_flask_path(r"(?P<account_id>\d+)/(?P<queue_name>.*)$")
|
||||
== r'<regex("\d+"):account_id>/<regex(".*"):queue_name>'
|
||||
)
|
||||
|
@ -1,5 +1,4 @@
|
||||
import pytest
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
from freezegun import freeze_time
|
||||
|
||||
from moto.core.utils import (
|
||||
@ -21,7 +20,7 @@ from moto.core.utils import (
|
||||
],
|
||||
)
|
||||
def test_camelcase_to_underscores(_input, expected):
|
||||
camelcase_to_underscores(_input).should.equal(expected)
|
||||
assert camelcase_to_underscores(_input) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -29,7 +28,7 @@ def test_camelcase_to_underscores(_input, expected):
|
||||
[("the_new_attribute", "theNewAttribute"), ("attribute", "attribute")],
|
||||
)
|
||||
def test_underscores_to_camelcase(_input, expected):
|
||||
underscores_to_camelcase(_input).should.equal(expected)
|
||||
assert underscores_to_camelcase(_input) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -37,7 +36,7 @@ def test_underscores_to_camelcase(_input, expected):
|
||||
[("TheNewAttribute", "theNewAttribute"), ("Attribute", "attribute")],
|
||||
)
|
||||
def test_pascal_to_camelcase(_input, expected):
|
||||
pascal_to_camelcase(_input).should.equal(expected)
|
||||
assert pascal_to_camelcase(_input) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -45,9 +44,9 @@ def test_pascal_to_camelcase(_input, expected):
|
||||
[("theNewAttribute", "TheNewAttribute"), ("attribute", "Attribute")],
|
||||
)
|
||||
def test_camelcase_to_pascal(_input, expected):
|
||||
camelcase_to_pascal(_input).should.equal(expected)
|
||||
assert camelcase_to_pascal(_input) == expected
|
||||
|
||||
|
||||
@freeze_time("2015-01-01 12:00:00")
|
||||
def test_unix_time():
|
||||
unix_time().should.equal(1420113600.0)
|
||||
assert unix_time() == 1420113600.0
|
||||
|
Loading…
Reference in New Issue
Block a user