Convert fixtures/exceptions to Pytest

This commit is contained in:
Bert Blommers 2020-11-11 15:54:01 +00:00
parent 6d364dc7aa
commit cb6731f340
16 changed files with 119 additions and 170 deletions

View File

@ -2,4 +2,3 @@ pytest
pytest-cov
sure==1.4.11
freezegun
parameterized>=0.7.0

View File

@ -407,8 +407,8 @@ def test_operations_with_invalid_tags():
DomainName="example.com",
Tags=[{"Key": "X" * 200, "Value": "Valid"}],
)
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
ex.exception.response["Error"]["Message"].should.contain(
ex.value.response["Error"]["Code"].should.equal("ValidationException")
ex.value.response["Error"]["Message"].should.contain(
"Member must have length less than or equal to 128"
)
@ -424,8 +424,8 @@ def test_operations_with_invalid_tags():
],
)
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
ex.exception.response["Error"]["Message"].should.contain(
ex.value.response["Error"]["Code"].should.equal("ValidationException")
ex.value.response["Error"]["Message"].should.contain(
"Member must have length less than or equal to 256"
)
@ -437,8 +437,8 @@ def test_operations_with_invalid_tags():
CertificateArn=arn,
Tags=[{"Key": "aws:xxx", "Value": "Valid"}, {"Key": "key2"}],
)
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
ex.exception.response["Error"]["Message"].should.contain(
ex.value.response["Error"]["Code"].should.equal("ValidationException")
ex.value.response["Error"]["Message"].should.contain(
"AWS internal tags cannot be changed with this API"
)
@ -447,8 +447,8 @@ def test_operations_with_invalid_tags():
client.remove_tags_from_certificate(
CertificateArn=arn, Tags=[{"Key": "aws:xxx", "Value": "Valid"}]
)
ex.exception.response["Error"]["Code"].should.equal("ValidationException")
ex.exception.response["Error"]["Message"].should.contain(
ex.value.response["Error"]["Code"].should.equal("ValidationException")
ex.value.response["Error"]["Message"].should.contain(
"AWS internal tags cannot be changed with this API"
)
@ -464,8 +464,8 @@ def test_add_too_many_tags():
CertificateArn=arn,
Tags=[{"Key": "a-%d" % i, "Value": "abcd"} for i in range(1, 52)],
)
ex.exception.response["Error"]["Code"].should.equal("TooManyTagsException")
ex.exception.response["Error"]["Message"].should.contain("contains too many Tags")
ex.value.response["Error"]["Code"].should.equal("TooManyTagsException")
ex.value.response["Error"]["Message"].should.contain("contains too many Tags")
client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.empty
# Add 49 tags first, then try to add 2 more.
@ -481,10 +481,10 @@ def test_add_too_many_tags():
CertificateArn=arn,
Tags=[{"Key": "x-1", "Value": "xyz"}, {"Key": "x-2", "Value": "xyz"}],
)
ex.exception.response["Error"]["Code"].should.equal("TooManyTagsException")
ex.exception.response["Error"]["Message"].should.contain("contains too many Tags")
ex.exception.response["Error"]["Message"].count("pqrs").should.equal(49)
ex.exception.response["Error"]["Message"].count("xyz").should.equal(2)
ex.value.response["Error"]["Code"].should.equal("TooManyTagsException")
ex.value.response["Error"]["Message"].should.contain("contains too many Tags")
ex.value.response["Error"]["Message"].count("pqrs").should.equal(49)
ex.value.response["Error"]["Message"].count("xyz").should.equal(2)
client.list_tags_for_certificate(CertificateArn=arn)["Tags"].should.have.length_of(
49
)
@ -502,19 +502,20 @@ def test_request_certificate_no_san():
# Also tests the SAN code
@freeze_time("2012-01-01 12:00:00", as_arg=True)
@mock_acm
def test_request_certificate_issued_status(frozen_time):
def test_request_certificate_issued_status():
# After requesting a certificate, it should then auto-validate after 1 minute
# Some sneaky programming for that ;-)
client = boto3.client("acm", region_name="eu-central-1")
with freeze_time("2012-01-01 12:00:00"):
resp = client.request_certificate(
DomainName="google.com",
SubjectAlternativeNames=["google.com", "www.google.com", "mail.google.com"],
)
arn = resp["CertificateArn"]
with freeze_time("2012-01-01 12:00:00"):
resp = client.describe_certificate(CertificateArn=arn)
resp["Certificate"]["CertificateArn"].should.equal(arn)
resp["Certificate"]["DomainName"].should.equal("google.com")
@ -525,21 +526,21 @@ def test_request_certificate_issued_status(frozen_time):
len(resp["Certificate"]["SubjectAlternativeNames"]).should.equal(3)
# validation will be pending for 1 minute.
with freeze_time("2012-01-01 12:00:00"):
resp = client.describe_certificate(CertificateArn=arn)
resp["Certificate"]["CertificateArn"].should.equal(arn)
resp["Certificate"]["Status"].should.equal("PENDING_VALIDATION")
if not settings.TEST_SERVER_MODE:
# Move time to get it issued.
frozen_time.move_to("2012-01-01 12:02:00")
with freeze_time("2012-01-01 12:02:00"):
resp = client.describe_certificate(CertificateArn=arn)
resp["Certificate"]["CertificateArn"].should.equal(arn)
resp["Certificate"]["Status"].should.equal("ISSUED")
@freeze_time("2012-01-01 12:00:00", as_arg=True)
@mock_acm
def test_request_certificate_with_mutiple_times(frozen_time):
def test_request_certificate_with_mutiple_times():
if settings.TEST_SERVER_MODE:
raise SkipTest("Cant manipulate time in server mode")
@ -547,6 +548,7 @@ def test_request_certificate_with_mutiple_times(frozen_time):
# Some sneaky programming for that ;-)
client = boto3.client("acm", region_name="eu-central-1")
with freeze_time("2012-01-01 12:00:00"):
resp = client.request_certificate(
IdempotencyToken="test_token",
DomainName="google.com",
@ -561,7 +563,7 @@ def test_request_certificate_with_mutiple_times(frozen_time):
"2012-01-01 12:30:00",
"2012-01-01 12:45:00",
):
frozen_time.move_to(time_intervals)
with freeze_time(time_intervals):
resp = client.request_certificate(
IdempotencyToken="test_token",
DomainName="google.com",
@ -571,7 +573,7 @@ def test_request_certificate_with_mutiple_times(frozen_time):
arn.should.equal(original_arn)
# Move time
frozen_time.move_to("2012-01-01 13:01:00")
with freeze_time("2012-01-01 13:01:00"):
resp = client.request_certificate(
IdempotencyToken="test_token",
DomainName="google.com",

View File

@ -7,7 +7,6 @@ from botocore.exceptions import ParamValidationError
import pytest
import sure # noqa
from botocore.exceptions import ClientError
from parameterized import parameterized
from .test_applicationautoscaling import register_scalable_target
DEFAULT_REGION = "us-east-1"
@ -106,7 +105,7 @@ def test_register_scalable_target_ecs_with_non_existent_service_should_return_va
err.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
@parameterized(
@pytest.mark.parametrize("namespace,r_id,dimension,expected",
[
("ecs", "service/default/test-svc", "ecs:service:DesiredCount", True),
("ecs", "banana/default/test-svc", "ecs:service:DesiredCount", False),

View File

@ -738,7 +738,8 @@ def test_submit_job():
else:
raise RuntimeError("Batch job timed out")
resp = logs_client.describe_log_streams(logGroupName="/aws/batch/job")
resp = logs_client.describe_log_streams(logGroupName="/aws/batch/job",
logStreamNamePrefix="sayhellotomylittlefriend")
len(resp["logStreams"]).should.equal(1)
ls_name = resp["logStreams"][0]["logStreamName"]

View File

@ -1,9 +1,10 @@
import pytest
from moto.dynamodb2.exceptions import IncorrectOperandType, IncorrectDataType
from moto.dynamodb2.models import Item, DynamoType
from moto.dynamodb2.parsing.executors import UpdateExpressionExecutor
from moto.dynamodb2.parsing.expressions import UpdateExpressionParser
from moto.dynamodb2.parsing.validators import UpdateExpressionValidator
from parameterized import parameterized
def test_execution_of_if_not_exists_not_existing_value():
@ -405,7 +406,7 @@ def test_execution_of_add_to_a_set():
assert expected_item == item
@parameterized(
@pytest.mark.parametrize("expression_attribute_values,unexpected_data_type",
[
(
{":value": {"S": "10"}},

View File

@ -1,3 +1,5 @@
import pytest
from moto.dynamodb2.exceptions import (
AttributeIsReservedKeyword,
ExpressionAttributeValueNotDefined,
@ -10,12 +12,10 @@ from moto.dynamodb2.models import Item, DynamoType
from moto.dynamodb2.parsing.ast_nodes import (
NodeDepthLeftTypeFetcher,
UpdateExpressionSetAction,
UpdateExpressionValue,
DDBTypedValue,
)
from moto.dynamodb2.parsing.expressions import UpdateExpressionParser
from moto.dynamodb2.parsing.validators import UpdateExpressionValidator
from parameterized import parameterized
def test_validation_of_update_expression_with_keyword():
@ -41,7 +41,7 @@ def test_validation_of_update_expression_with_keyword():
assert e.keyword == "path"
@parameterized(
@pytest.mark.parametrize("update_expression",
[
"SET a = #b + :val2",
"SET a = :val2 + #b",
@ -101,7 +101,7 @@ def test_validation_of_update_expression_with_attribute_that_does_not_exist_in_i
assert True
@parameterized(
@pytest.mark.parametrize("update_expression",
[
"SET a = #c",
"SET a = #c + #d",

View File

@ -428,7 +428,7 @@ def test_create_subnet_with_invalid_cidr_range_multiple_vpc_cidr_blocks():
subnet_cidr_block = "10.2.0.0/20"
with pytest.raises(ClientError) as ex:
subnet = ec2.create_subnet(VpcId=vpc.id, CidrBlock=subnet_cidr_block)
str(ex.exception).should.equal(
str(ex.value).should.equal(
"An error occurred (InvalidSubnet.Range) when calling the CreateSubnet "
"operation: The CIDR '{}' is invalid.".format(subnet_cidr_block)
)

View File

@ -5,6 +5,7 @@ from copy import deepcopy
from datetime import datetime
import boto3
import json
import pytz
import six
import sure # noqa
@ -803,11 +804,8 @@ def test_instance_groups():
x["AutoScalingPolicy"]["Status"]["State"].should.equal("ATTACHED")
returned_policy = dict(x["AutoScalingPolicy"])
del returned_policy["Status"]
for dimension in y["AutoScalingPolicy"]["Rules"]["Trigger"][
"CloudWatchAlarmDefinition"
]["Dimensions"]:
dimension["Value"] = cluster_id
returned_policy.should.equal(y["AutoScalingPolicy"])
policy = json.loads(json.dumps(y["AutoScalingPolicy"]).replace("${emr.clusterId}", cluster_id))
returned_policy.should.equal(policy)
if "EbsConfiguration" in y:
_do_assertion_ebs_configuration(x, y)
# Configurations

View File

@ -6,7 +6,6 @@ import sure # noqa
from botocore.exceptions import ClientError
from moto import mock_forecast
from moto.core import ACCOUNT_ID
from parameterized import parameterized
region = "us-east-1"
account_id = None
@ -21,7 +20,7 @@ valid_domains = [
]
@parameterized(valid_domains)
@pytest.mark.parametrize("domain", valid_domains)
@mock_forecast
def test_forecast_dataset_group_create(domain):
name = "example_dataset_group"
@ -41,23 +40,23 @@ def test_forecast_dataset_group_create_invalid_domain():
with pytest.raises(ClientError) as exc:
client.create_dataset_group(DatasetGroupName=name, Domain=invalid_domain)
exc.exception.response["Error"]["Code"].should.equal("ValidationException")
exc.exception.response["Error"]["Message"].should.equal(
exc.value.response["Error"]["Code"].should.equal("ValidationException")
exc.value.response["Error"]["Message"].should.equal(
"1 validation error detected: Value '"
+ invalid_domain
+ "' at 'domain' failed to satisfy constraint: Member must satisfy enum value set ['INVENTORY_PLANNING', 'METRICS', 'RETAIL', 'EC2_CAPACITY', 'CUSTOM', 'WEB_TRAFFIC', 'WORK_FORCE']"
)
@parameterized([" ", "a" * 64])
@pytest.mark.parametrize("name", [" ", "a" * 64])
@mock_forecast
def test_forecast_dataset_group_create_invalid_name(name):
client = boto3.client("forecast", region_name=region)
with pytest.raises(ClientError) as exc:
client.create_dataset_group(DatasetGroupName=name, Domain="CUSTOM")
exc.exception.response["Error"]["Code"].should.equal("ValidationException")
exc.exception.response["Error"]["Message"].should.contain(
exc.value.response["Error"]["Code"].should.equal("ValidationException")
exc.value.response["Error"]["Message"].should.contain(
"1 validation error detected: Value '"
+ name
+ "' at 'datasetGroupName' failed to satisfy constraint: Member must"
@ -72,7 +71,7 @@ def test_forecast_dataset_group_create_duplicate_fails():
with pytest.raises(ClientError) as exc:
client.create_dataset_group(DatasetGroupName="name", Domain="RETAIL")
exc.exception.response["Error"]["Code"].should.equal(
exc.value.response["Error"]["Code"].should.equal(
"ResourceAlreadyExistsException"
)
@ -123,8 +122,8 @@ def test_forecast_delete_dataset_group_missing():
with pytest.raises(ClientError) as exc:
client.delete_dataset_group(DatasetGroupArn=missing_dsg_arn)
exc.exception.response["Error"]["Code"].should.equal("ResourceNotFoundException")
exc.exception.response["Error"]["Message"].should.equal(
exc.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
exc.value.response["Error"]["Message"].should.equal(
"No resource found " + missing_dsg_arn
)
@ -153,8 +152,8 @@ def test_forecast_update_dataset_group_not_found():
)
with pytest.raises(ClientError) as exc:
client.update_dataset_group(DatasetGroupArn=dataset_group_arn, DatasetArns=[])
exc.exception.response["Error"]["Code"].should.equal("ResourceNotFoundException")
exc.exception.response["Error"]["Message"].should.equal(
exc.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
exc.value.response["Error"]["Message"].should.equal(
"No resource found " + dataset_group_arn
)
@ -181,8 +180,8 @@ def test_describe_dataset_group_missing():
)
with pytest.raises(ClientError) as exc:
client.describe_dataset_group(DatasetGroupArn=dataset_group_arn)
exc.exception.response["Error"]["Code"].should.equal("ResourceNotFoundException")
exc.exception.response["Error"]["Message"].should.equal(
exc.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
exc.value.response["Error"]["Message"].should.equal(
"No resource found " + dataset_group_arn
)
@ -195,8 +194,8 @@ def test_create_dataset_group_missing_datasets():
client.create_dataset_group(
DatasetGroupName="name", Domain="CUSTOM", DatasetArns=[dataset_arn]
)
exc.exception.response["Error"]["Code"].should.equal("InvalidInputException")
exc.exception.response["Error"]["Message"].should.equal(
exc.value.response["Error"]["Code"].should.equal("InvalidInputException")
exc.value.response["Error"]["Message"].should.equal(
"Dataset arns: [" + dataset_arn + "] are not found"
)
@ -215,7 +214,7 @@ def test_update_dataset_group_missing_datasets():
client.update_dataset_group(
DatasetGroupArn=dataset_group_arn, DatasetArns=[dataset_arn]
)
exc.exception.response["Error"]["Code"].should.equal("InvalidInputException")
exc.exception.response["Error"]["Message"].should.equal(
exc.value.response["Error"]["Code"].should.equal("InvalidInputException")
exc.value.response["Error"]["Message"].should.equal(
"Dataset arns: [" + dataset_arn + "] are not found"
)

View File

@ -10,17 +10,12 @@ import sure # noqa
from boto.exception import JSONResponseError
from boto.kms.exceptions import AlreadyExistsException, NotFoundException
import pytest
from parameterized import parameterized
from moto.core.exceptions import JsonRESTError
from moto.kms.models import KmsBackend
from moto.kms.exceptions import NotFoundException as MotoNotFoundException
from moto import mock_kms_deprecated, mock_kms
PLAINTEXT_VECTORS = (
(b"some encodeable plaintext",),
(b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",),
("some unicode characters ø˚∆øˆˆ∆ßçøˆˆçßøˆ¨¥",),
)
PLAINTEXT_VECTORS = [b"some encodeable plaintext", b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16", "some unicode characters ø˚∆øˆˆ∆ßçøˆˆçßøˆ¨¥"]
def _get_encoded_value(plaintext):
@ -495,7 +490,7 @@ def test__create_alias__raises_if_alias_has_colon_character():
ex.status.should.equal(400)
@parameterized((("alias/my-alias_/",), ("alias/my_alias-/",)))
@pytest.mark.parametrize("alias_name", ["alias/my-alias_/", "alias/my_alias-/"])
@mock_kms_deprecated
def test__create_alias__accepted_characters(alias_name):
kms = boto.connect_kms()

View File

@ -11,15 +11,12 @@ import six
import sure # noqa
from freezegun import freeze_time
import pytest
from parameterized import parameterized
from moto import mock_kms
PLAINTEXT_VECTORS = (
(b"some encodeable plaintext",),
(b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",),
("some unicode characters ø˚∆øˆˆ∆ßçøˆˆçßøˆ¨¥",),
)
PLAINTEXT_VECTORS = [b"some encodeable plaintext",
b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",
"some unicode characters ø˚∆øˆˆ∆ßçøˆˆçßøˆ¨¥"]
def _get_encoded_value(plaintext):
@ -132,13 +129,7 @@ def test_describe_key():
response["KeyMetadata"].should_not.have.key("SigningAlgorithms")
@parameterized(
(
("alias/does-not-exist",),
("arn:aws:kms:us-east-1:012345678912:alias/does-not-exist",),
("invalid",),
)
)
@pytest.mark.parametrize("key_id", ["alias/does-not-exist", "arn:aws:kms:us-east-1:012345678912:alias/does-not-exist", "invalid"])
@mock_kms
def test_describe_key_via_alias_invalid_alias(key_id):
client = boto3.client("kms", region_name="us-east-1")
@ -168,7 +159,7 @@ def test_generate_data_key():
response["KeyId"].should.equal(key_arn)
@parameterized(PLAINTEXT_VECTORS)
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
@mock_kms
def test_encrypt(plaintext):
client = boto3.client("kms", region_name="us-west-2")
@ -187,7 +178,7 @@ def test_encrypt(plaintext):
response["KeyId"].should.equal(key_arn)
@parameterized(PLAINTEXT_VECTORS)
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
@mock_kms
def test_decrypt(plaintext):
client = boto3.client("kms", region_name="us-west-2")
@ -213,16 +204,8 @@ def test_decrypt(plaintext):
decrypt_response["KeyId"].should.equal(key_arn)
@parameterized(
(
("not-a-uuid",),
("alias/DoesNotExist",),
("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",),
("d25652e4-d2d2-49f7-929a-671ccda580c6",),
(
"arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6",
),
)
@pytest.mark.parametrize("key_id",
["not-a-uuid", "alias/DoesNotExist", "arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist", "d25652e4-d2d2-49f7-929a-671ccda580c6", "arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6"]
)
@mock_kms
def test_invalid_key_ids(key_id):
@ -232,7 +215,7 @@ def test_invalid_key_ids(key_id):
client.generate_data_key(KeyId=key_id, NumberOfBytes=5)
@parameterized(PLAINTEXT_VECTORS)
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
@mock_kms
def test_kms_encrypt(plaintext):
client = boto3.client("kms", region_name="us-east-1")
@ -369,7 +352,7 @@ def test_list_resource_tags():
assert response["Tags"][0]["TagValue"] == "string"
@parameterized(
@pytest.mark.parametrize("kwargs,expected_key_length",
(
(dict(KeySpec="AES_256"), 32),
(dict(KeySpec="AES_128"), 16),
@ -401,14 +384,8 @@ def test_generate_data_key_decrypt():
assert resp1["Plaintext"] == resp2["Plaintext"]
@parameterized(
(
(dict(KeySpec="AES_257"),),
(dict(KeySpec="AES_128", NumberOfBytes=16),),
(dict(NumberOfBytes=2048),),
(dict(NumberOfBytes=0),),
(dict(),),
)
@pytest.mark.parametrize("kwargs",
[dict(KeySpec="AES_257"), dict(KeySpec="AES_128", NumberOfBytes=16), dict(NumberOfBytes=2048), dict(NumberOfBytes=0), dict()]
)
@mock_kms
def test_generate_data_key_invalid_size_params(kwargs):
@ -421,15 +398,8 @@ def test_generate_data_key_invalid_size_params(kwargs):
client.generate_data_key(KeyId=key["KeyMetadata"]["KeyId"], **kwargs)
@parameterized(
(
("alias/DoesNotExist",),
("arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist",),
("d25652e4-d2d2-49f7-929a-671ccda580c6",),
(
"arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6",
),
)
@pytest.mark.parametrize("key_id",
["alias/DoesNotExist", "arn:aws:kms:us-east-1:012345678912:alias/DoesNotExist", "d25652e4-d2d2-49f7-929a-671ccda580c6", "arn:aws:kms:us-east-1:012345678912:key/d25652e4-d2d2-49f7-929a-671ccda580c6"]
)
@mock_kms
def test_generate_data_key_invalid_key(key_id):
@ -439,13 +409,8 @@ def test_generate_data_key_invalid_key(key_id):
client.generate_data_key(KeyId=key_id, KeySpec="AES_256")
@parameterized(
(
("alias/DoesExist", False),
("arn:aws:kms:us-east-1:012345678912:alias/DoesExist", False),
("", True),
("arn:aws:kms:us-east-1:012345678912:key/", True),
)
@pytest.mark.parametrize("prefix,append_key_id",
[("alias/DoesExist", False), ("arn:aws:kms:us-east-1:012345678912:alias/DoesExist", False), ("", True), ("arn:aws:kms:us-east-1:012345678912:key/", True)]
)
@mock_kms
def test_generate_data_key_all_valid_key_ids(prefix, append_key_id):
@ -473,7 +438,7 @@ def test_generate_data_key_without_plaintext_decrypt():
assert "Plaintext" not in resp1
@parameterized(PLAINTEXT_VECTORS)
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
@mock_kms
def test_re_encrypt_decrypt(plaintext):
client = boto3.client("kms", region_name="us-west-2")
@ -536,7 +501,7 @@ def test_re_encrypt_to_invalid_destination():
)
@parameterized(((12,), (44,), (91,), (1,), (1024,)))
@pytest.mark.parametrize("number_of_bytes", [12, 44, 91, 1, 1024])
@mock_kms
def test_generate_random(number_of_bytes):
client = boto3.client("kms", region_name="us-west-2")
@ -547,14 +512,8 @@ def test_generate_random(number_of_bytes):
len(response["Plaintext"]).should.equal(number_of_bytes)
@parameterized(
(
(2048, botocore.exceptions.ClientError),
(1025, botocore.exceptions.ClientError),
(0, botocore.exceptions.ParamValidationError),
(-1, botocore.exceptions.ParamValidationError),
(-1024, botocore.exceptions.ParamValidationError),
)
@pytest.mark.parametrize("number_of_bytes,error_type",
[(2048, botocore.exceptions.ClientError), (1025, botocore.exceptions.ClientError), (0, botocore.exceptions.ParamValidationError), (-1, botocore.exceptions.ParamValidationError), (-1024, botocore.exceptions.ParamValidationError)]
)
@mock_kms
def test_generate_random_invalid_number_of_bytes(number_of_bytes, error_type):

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
import sure # noqa
import pytest
from parameterized import parameterized
from moto.kms.exceptions import (
AccessDeniedException,
@ -22,7 +21,7 @@ from moto.kms.utils import (
Ciphertext,
)
ENCRYPTION_CONTEXT_VECTORS = (
ENCRYPTION_CONTEXT_VECTORS = [
(
{"this": "is", "an": "encryption", "context": "example"},
b"an" b"encryption" b"context" b"example" b"this" b"is",
@ -31,8 +30,8 @@ ENCRYPTION_CONTEXT_VECTORS = (
{"a_this": "one", "b_is": "actually", "c_in": "order"},
b"a_this" b"one" b"b_is" b"actually" b"c_in" b"order",
),
)
CIPHERTEXT_BLOB_VECTORS = (
]
CIPHERTEXT_BLOB_VECTORS = [
(
Ciphertext(
key_id="d25652e4-d2d2-49f7-929a-671ccda580c6",
@ -57,7 +56,7 @@ CIPHERTEXT_BLOB_VECTORS = (
b"1234567890123456"
b"some ciphertext that is much longer now",
),
)
]
def test_generate_data_key():
@ -74,32 +73,32 @@ def test_generate_master_key():
len(test).should.equal(MASTER_KEY_LEN)
@parameterized(ENCRYPTION_CONTEXT_VECTORS)
@pytest.mark.parametrize("raw,serialized", ENCRYPTION_CONTEXT_VECTORS)
def test_serialize_encryption_context(raw, serialized):
test = _serialize_encryption_context(raw)
test.should.equal(serialized)
@parameterized(CIPHERTEXT_BLOB_VECTORS)
@pytest.mark.parametrize("raw,_serialized", CIPHERTEXT_BLOB_VECTORS)
def test_cycle_ciphertext_blob(raw, _serialized):
test_serialized = _serialize_ciphertext_blob(raw)
test_deserialized = _deserialize_ciphertext_blob(test_serialized)
test_deserialized.should.equal(raw)
@parameterized(CIPHERTEXT_BLOB_VECTORS)
@pytest.mark.parametrize("raw,serialized", CIPHERTEXT_BLOB_VECTORS)
def test_serialize_ciphertext_blob(raw, serialized):
test = _serialize_ciphertext_blob(raw)
test.should.equal(serialized)
@parameterized(CIPHERTEXT_BLOB_VECTORS)
@pytest.mark.parametrize("raw,serialized", CIPHERTEXT_BLOB_VECTORS)
def test_deserialize_ciphertext_blob(raw, serialized):
test = _deserialize_ciphertext_blob(serialized)
test.should.equal(raw)
@parameterized(((ec[0],) for ec in ENCRYPTION_CONTEXT_VECTORS))
@pytest.mark.parametrize("encryption_context", [ec[0] for ec in ENCRYPTION_CONTEXT_VECTORS])
def test_encrypt_decrypt_cycle(encryption_context):
plaintext = b"some secret plaintext"
master_key = Key("nop", "nop", "nop", "nop", "nop")

View File

@ -24,7 +24,6 @@ from botocore.handlers import disable_signing
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from freezegun import freeze_time
from parameterized import parameterized
import six
import requests
from moto.s3.responses import DEFAULT_REGION_NAME
@ -426,7 +425,7 @@ def test_copy_key():
bucket.get_key("new-key").get_contents_as_string().should.equal(b"some value")
@parameterized([("the-unicode-💩-key",), ("key-with?question-mark",)])
@pytest.mark.parametrize("key_name", ["the-unicode-💩-key", "key-with?question-mark"])
@mock_s3_deprecated
def test_copy_key_with_special_chars(key_name):
conn = boto.connect_s3("the_key", "the_secret")
@ -4016,7 +4015,7 @@ def test_root_dir_with_empty_name_works():
store_and_read_back_a_key("/")
@parameterized(["mybucket", "my.bucket"])
@pytest.mark.parametrize("bucket_name", ["mybucket", "my.bucket"])
@mock_s3
def test_leading_slashes_not_removed(bucket_name):
"""Make sure that leading slashes are not removed internally."""
@ -4038,8 +4037,8 @@ def test_leading_slashes_not_removed(bucket_name):
e.value.response["Error"]["Code"].should.equal("NoSuchKey")
@parameterized(
[("foo/bar/baz",), ("foo",), ("foo/run_dt%3D2019-01-01%252012%253A30%253A00",)]
@pytest.mark.parametrize("key",
["foo/bar/baz", "foo", "foo/run_dt%3D2019-01-01%252012%253A30%253A00"]
)
@mock_s3
def test_delete_objects_with_url_encoded_key(key):

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import os
import pytest
from sure import expect
from moto.s3.utils import (
bucket_name_from_url,
@ -8,7 +9,6 @@ from moto.s3.utils import (
clean_key_name,
undo_clean_key_name,
)
from parameterized import parameterized
def test_base_url():
@ -93,7 +93,7 @@ def test_parse_region_from_url():
parse_region_from_url(url).should.equal(expected)
@parameterized(
@pytest.mark.parametrize("key,expected",
[
("foo/bar/baz", "foo/bar/baz"),
("foo", "foo"),
@ -107,7 +107,7 @@ def test_clean_key_name(key, expected):
clean_key_name(key).should.equal(expected)
@parameterized(
@pytest.mark.parametrize("key,expected",
[
("foo/bar/baz", "foo/bar/baz"),
("foo", "foo"),

View File

@ -638,9 +638,7 @@ def test_put_secret_value_on_non_existing_secret():
VersionStages=["AWSCURRENT"],
)
assert \
"Secrets Manager can't find the specified secret." == \
cm.exception.response["Error"]["Message"]
cm.value.response["Error"]["Message"].should.equal("Secrets Manager can't find the specified secret.")
@mock_secretsmanager
@ -954,7 +952,7 @@ def test_tag_resource():
assert \
"Secrets Manager can't find the specified secret." == \
cm.exception.response["Error"]["Message"]
cm.value.response["Error"]["Message"]
@mock_secretsmanager

View File

@ -280,7 +280,7 @@ def test_message_with_invalid_attributes():
"öther_encodings": {"DataType": "String", "StringValue": "str"},
},
)
ex = e.exception
ex = e.value
ex.response["Error"]["Code"].should.equal("MessageAttributesInvalid")
ex.response["Error"]["Message"].should.equal(
"The message attribute name 'öther_encodings' is invalid. "
@ -2251,7 +2251,7 @@ def test_maximum_message_size_attribute_default():
int(queue.attributes["MaximumMessageSize"]).should.equal(MAXIMUM_MESSAGE_LENGTH)
with pytest.raises(Exception) as e:
queue.send_message(MessageBody="a" * (MAXIMUM_MESSAGE_LENGTH + 1))
ex = e.exception
ex = e.value
ex.response["Error"]["Code"].should.equal("InvalidParameterValue")
@ -2268,7 +2268,7 @@ def test_maximum_message_size_attribute_fails_for_invalid_values():
QueueName="test-queue",
Attributes={"MaximumMessageSize": str(message_size)},
)
ex = e.exception
ex = e.value
ex.response["Error"]["Code"].should.equal("InvalidAttributeValue")
@ -2283,7 +2283,7 @@ def test_send_message_fails_when_message_size_greater_than_max_message_size():
int(queue.attributes["MaximumMessageSize"]).should.equal(message_size_limit)
with pytest.raises(ClientError) as e:
queue.send_message(MessageBody="a" * (message_size_limit + 1))
ex = e.exception
ex = e.value
ex.response["Error"]["Code"].should.equal("InvalidParameterValue")
ex.response["Error"]["Message"].should.contain(
"{} bytes".format(message_size_limit)