Techdebt: Replace sure with regular assertions in KMS (#6590)
This commit is contained in:
parent
f69d0b2b5f
commit
b11a693b11
@ -9,7 +9,6 @@ import os
|
||||
|
||||
import boto3
|
||||
import botocore.exceptions
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
from botocore.exceptions import ClientError
|
||||
from freezegun import freeze_time
|
||||
import pytest
|
||||
@ -37,10 +36,10 @@ def test_create_key_without_description():
|
||||
conn = boto3.client("kms", region_name="us-east-1")
|
||||
metadata = conn.create_key(Policy="my policy")["KeyMetadata"]
|
||||
|
||||
metadata.should.have.key("AWSAccountId").equals(ACCOUNT_ID)
|
||||
metadata.should.have.key("KeyId")
|
||||
metadata.should.have.key("Arn")
|
||||
metadata.should.have.key("Description").equal("")
|
||||
assert metadata["AWSAccountId"] == ACCOUNT_ID
|
||||
assert "KeyId" in metadata
|
||||
assert "Arn" in metadata
|
||||
assert metadata["Description"] == ""
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -53,58 +52,56 @@ def test_create_key():
|
||||
Tags=[{"TagKey": "project", "TagValue": "moto"}],
|
||||
)
|
||||
|
||||
key["KeyMetadata"]["Arn"].should.equal(
|
||||
f"arn:aws:kms:us-east-1:{ACCOUNT_ID}:key/{key['KeyMetadata']['KeyId']}"
|
||||
assert (
|
||||
key["KeyMetadata"]["Arn"]
|
||||
== f"arn:aws:kms:us-east-1:{ACCOUNT_ID}:key/{key['KeyMetadata']['KeyId']}"
|
||||
)
|
||||
key["KeyMetadata"]["AWSAccountId"].should.equal(ACCOUNT_ID)
|
||||
key["KeyMetadata"]["CreationDate"].should.be.a(datetime)
|
||||
key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT")
|
||||
key["KeyMetadata"]["KeySpec"].should.equal("SYMMETRIC_DEFAULT")
|
||||
key["KeyMetadata"]["Description"].should.equal("my key")
|
||||
key["KeyMetadata"]["Enabled"].should.equal(True)
|
||||
key["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"])
|
||||
key["KeyMetadata"]["KeyId"].should.match("[-a-zA-Z0-9]+")
|
||||
key["KeyMetadata"]["KeyManager"].should.equal("CUSTOMER")
|
||||
key["KeyMetadata"]["KeyState"].should.equal("Enabled")
|
||||
key["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT")
|
||||
key["KeyMetadata"]["Origin"].should.equal("AWS_KMS")
|
||||
key["KeyMetadata"].should_not.have.key("SigningAlgorithms")
|
||||
assert key["KeyMetadata"]["AWSAccountId"] == ACCOUNT_ID
|
||||
assert key["KeyMetadata"]["CustomerMasterKeySpec"] == "SYMMETRIC_DEFAULT"
|
||||
assert key["KeyMetadata"]["KeySpec"] == "SYMMETRIC_DEFAULT"
|
||||
assert key["KeyMetadata"]["Description"] == "my key"
|
||||
assert key["KeyMetadata"]["Enabled"] is True
|
||||
assert key["KeyMetadata"]["EncryptionAlgorithms"] == ["SYMMETRIC_DEFAULT"]
|
||||
assert key["KeyMetadata"]["KeyManager"] == "CUSTOMER"
|
||||
assert key["KeyMetadata"]["KeyState"] == "Enabled"
|
||||
assert key["KeyMetadata"]["KeyUsage"] == "ENCRYPT_DECRYPT"
|
||||
assert key["KeyMetadata"]["Origin"] == "AWS_KMS"
|
||||
assert "SigningAlgorithms" not in key["KeyMetadata"]
|
||||
|
||||
key = conn.create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="RSA_2048")
|
||||
|
||||
sorted(key["KeyMetadata"]["EncryptionAlgorithms"]).should.equal(
|
||||
["RSAES_OAEP_SHA_1", "RSAES_OAEP_SHA_256"]
|
||||
)
|
||||
key["KeyMetadata"].should_not.have.key("SigningAlgorithms")
|
||||
assert sorted(key["KeyMetadata"]["EncryptionAlgorithms"]) == [
|
||||
"RSAES_OAEP_SHA_1",
|
||||
"RSAES_OAEP_SHA_256",
|
||||
]
|
||||
assert "SigningAlgorithms" not in key["KeyMetadata"]
|
||||
|
||||
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="RSA_2048")
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
sorted(key["KeyMetadata"]["SigningAlgorithms"]).should.equal(
|
||||
[
|
||||
"RSASSA_PKCS1_V1_5_SHA_256",
|
||||
"RSASSA_PKCS1_V1_5_SHA_384",
|
||||
"RSASSA_PKCS1_V1_5_SHA_512",
|
||||
"RSASSA_PSS_SHA_256",
|
||||
"RSASSA_PSS_SHA_384",
|
||||
"RSASSA_PSS_SHA_512",
|
||||
]
|
||||
)
|
||||
assert "EncryptionAlgorithms" not in key["KeyMetadata"]
|
||||
assert sorted(key["KeyMetadata"]["SigningAlgorithms"]) == [
|
||||
"RSASSA_PKCS1_V1_5_SHA_256",
|
||||
"RSASSA_PKCS1_V1_5_SHA_384",
|
||||
"RSASSA_PKCS1_V1_5_SHA_512",
|
||||
"RSASSA_PSS_SHA_256",
|
||||
"RSASSA_PSS_SHA_384",
|
||||
"RSASSA_PSS_SHA_512",
|
||||
]
|
||||
|
||||
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_SECG_P256K1")
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_256"])
|
||||
assert "EncryptionAlgorithms" not in key["KeyMetadata"]
|
||||
assert key["KeyMetadata"]["SigningAlgorithms"] == ["ECDSA_SHA_256"]
|
||||
|
||||
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_NIST_P384")
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_384"])
|
||||
assert "EncryptionAlgorithms" not in key["KeyMetadata"]
|
||||
assert key["KeyMetadata"]["SigningAlgorithms"] == ["ECDSA_SHA_384"]
|
||||
|
||||
key = conn.create_key(KeyUsage="SIGN_VERIFY", KeySpec="ECC_NIST_P521")
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"])
|
||||
assert "EncryptionAlgorithms" not in key["KeyMetadata"]
|
||||
assert key["KeyMetadata"]["SigningAlgorithms"] == ["ECDSA_SHA_512"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -118,8 +115,8 @@ def test_create_multi_region_key():
|
||||
Tags=[{"TagKey": "project", "TagValue": "moto"}],
|
||||
)
|
||||
|
||||
key["KeyMetadata"]["KeyId"].should.match("^mrk-")
|
||||
key["KeyMetadata"]["MultiRegion"].should.equal(True)
|
||||
assert key["KeyMetadata"]["KeyId"].startswith("mrk-")
|
||||
assert key["KeyMetadata"]["MultiRegion"] is True
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -133,8 +130,8 @@ def test_non_multi_region_keys_should_not_have_multi_region_properties():
|
||||
Tags=[{"TagKey": "project", "TagValue": "moto"}],
|
||||
)
|
||||
|
||||
key["KeyMetadata"]["KeyId"].should_not.match("^mrk-")
|
||||
key["KeyMetadata"]["MultiRegion"].should.equal(False)
|
||||
assert not key["KeyMetadata"]["KeyId"].startswith("mrk-")
|
||||
assert key["KeyMetadata"]["MultiRegion"] is False
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -169,11 +166,11 @@ def test_create_key_deprecated_master_custom_key_spec():
|
||||
conn = boto3.client("kms", region_name="us-east-1")
|
||||
key = conn.create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P521")
|
||||
|
||||
key["KeyMetadata"].should_not.have.key("EncryptionAlgorithms")
|
||||
key["KeyMetadata"]["SigningAlgorithms"].should.equal(["ECDSA_SHA_512"])
|
||||
assert "EncryptionAlgorithms" not in key["KeyMetadata"]
|
||||
assert key["KeyMetadata"]["SigningAlgorithms"] == ["ECDSA_SHA_512"]
|
||||
|
||||
key["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("ECC_NIST_P521")
|
||||
key["KeyMetadata"]["KeySpec"].should.equal("ECC_NIST_P521")
|
||||
assert key["KeyMetadata"]["CustomerMasterKeySpec"] == "ECC_NIST_P521"
|
||||
assert key["KeyMetadata"]["KeySpec"] == "ECC_NIST_P521"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("id_or_arn", ["KeyId", "Arn"])
|
||||
@ -185,19 +182,18 @@ def test_describe_key(id_or_arn):
|
||||
|
||||
response = client.describe_key(KeyId=key_id)
|
||||
|
||||
response["KeyMetadata"]["AWSAccountId"].should.equal("123456789012")
|
||||
response["KeyMetadata"]["CreationDate"].should.be.a(datetime)
|
||||
response["KeyMetadata"]["CustomerMasterKeySpec"].should.equal("SYMMETRIC_DEFAULT")
|
||||
response["KeyMetadata"]["KeySpec"].should.equal("SYMMETRIC_DEFAULT")
|
||||
response["KeyMetadata"]["Description"].should.equal("my key")
|
||||
response["KeyMetadata"]["Enabled"].should.equal(True)
|
||||
response["KeyMetadata"]["EncryptionAlgorithms"].should.equal(["SYMMETRIC_DEFAULT"])
|
||||
response["KeyMetadata"]["KeyId"].should.match("[-a-zA-Z0-9]+")
|
||||
response["KeyMetadata"]["KeyManager"].should.equal("CUSTOMER")
|
||||
response["KeyMetadata"]["KeyState"].should.equal("Enabled")
|
||||
response["KeyMetadata"]["KeyUsage"].should.equal("ENCRYPT_DECRYPT")
|
||||
response["KeyMetadata"]["Origin"].should.equal("AWS_KMS")
|
||||
response["KeyMetadata"].should_not.have.key("SigningAlgorithms")
|
||||
assert response["KeyMetadata"]["AWSAccountId"] == "123456789012"
|
||||
assert response["KeyMetadata"]["CreationDate"] is not None
|
||||
assert response["KeyMetadata"]["CustomerMasterKeySpec"] == "SYMMETRIC_DEFAULT"
|
||||
assert response["KeyMetadata"]["KeySpec"] == "SYMMETRIC_DEFAULT"
|
||||
assert response["KeyMetadata"]["Description"] == "my key"
|
||||
assert response["KeyMetadata"]["Enabled"] is True
|
||||
assert response["KeyMetadata"]["EncryptionAlgorithms"] == ["SYMMETRIC_DEFAULT"]
|
||||
assert response["KeyMetadata"]["KeyManager"] == "CUSTOMER"
|
||||
assert response["KeyMetadata"]["KeyState"] == "Enabled"
|
||||
assert response["KeyMetadata"]["KeyUsage"] == "ENCRYPT_DECRYPT"
|
||||
assert response["KeyMetadata"]["Origin"] == "AWS_KMS"
|
||||
assert "SigningAlgorithms" not in response["KeyMetadata"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -210,21 +206,19 @@ def test_get_key_policy_default():
|
||||
policy = client.get_key_policy(KeyId=key_id, PolicyName="default")["Policy"]
|
||||
|
||||
# then
|
||||
json.loads(policy).should.equal(
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Id": "key-default-1",
|
||||
"Statement": [
|
||||
{
|
||||
"Sid": "Enable IAM User Permissions",
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
|
||||
"Action": "kms:*",
|
||||
"Resource": "*",
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
assert json.loads(policy) == {
|
||||
"Version": "2012-10-17",
|
||||
"Id": "key-default-1",
|
||||
"Statement": [
|
||||
{
|
||||
"Sid": "Enable IAM User Permissions",
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
|
||||
"Action": "kms:*",
|
||||
"Resource": "*",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -235,7 +229,7 @@ def test_describe_key_via_alias():
|
||||
client.create_alias(AliasName="alias/my-alias", TargetKeyId=key_id)
|
||||
|
||||
alias_key = client.describe_key(KeyId="alias/my-alias")
|
||||
alias_key["KeyMetadata"]["Description"].should.equal("my key")
|
||||
assert alias_key["KeyMetadata"]["Description"] == "my key"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -251,9 +245,11 @@ def test__create_alias__can_create_multiple_aliases_for_same_key_id():
|
||||
|
||||
for name in alias_names:
|
||||
alias_arn = f"arn:aws:kms:us-east-1:{ACCOUNT_ID}:{name}"
|
||||
aliases.should.contain(
|
||||
{"AliasName": name, "AliasArn": alias_arn, "TargetKeyId": key_id}
|
||||
)
|
||||
assert {
|
||||
"AliasName": name,
|
||||
"AliasArn": alias_arn,
|
||||
"TargetKeyId": key_id,
|
||||
} in aliases
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -271,14 +267,16 @@ def test_list_aliases():
|
||||
default_alias_names = list(default_alias_target_keys.keys())
|
||||
|
||||
aliases = client.list_aliases()["Aliases"]
|
||||
aliases.should.have.length_of(14)
|
||||
assert len(aliases) == 14
|
||||
for name in default_alias_names:
|
||||
full_name = f"alias/{name}"
|
||||
arn = f"arn:aws:kms:{region}:{ACCOUNT_ID}:{full_name}"
|
||||
target_key_id = default_alias_target_keys[name]
|
||||
aliases.should.contain(
|
||||
{"AliasName": full_name, "AliasArn": arn, "TargetKeyId": target_key_id}
|
||||
)
|
||||
assert {
|
||||
"AliasName": full_name,
|
||||
"AliasArn": arn,
|
||||
"TargetKeyId": target_key_id,
|
||||
} in aliases
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -292,10 +290,12 @@ def test_list_aliases_for_key_id():
|
||||
client.create_alias(AliasName=my_alias, TargetKeyId=key_id)
|
||||
|
||||
aliases = client.list_aliases(KeyId=key_id)["Aliases"]
|
||||
aliases.should.have.length_of(1)
|
||||
aliases.should.contain(
|
||||
{"AliasName": my_alias, "AliasArn": alias_arn, "TargetKeyId": key_id}
|
||||
)
|
||||
assert len(aliases) == 1
|
||||
assert {
|
||||
"AliasName": my_alias,
|
||||
"AliasArn": alias_arn,
|
||||
"TargetKeyId": key_id,
|
||||
} in aliases
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -312,12 +312,14 @@ def test_list_aliases_for_key_arn():
|
||||
client.create_alias(AliasName=arn_alias, TargetKeyId=key_arn)
|
||||
|
||||
aliases = client.list_aliases(KeyId=key_arn)["Aliases"]
|
||||
aliases.should.have.length_of(2)
|
||||
assert len(aliases) == 2
|
||||
for alias in [id_alias, arn_alias]:
|
||||
alias_arn = f"arn:aws:kms:{region}:{ACCOUNT_ID}:{alias}"
|
||||
aliases.should.contain(
|
||||
{"AliasName": alias, "AliasArn": alias_arn, "TargetKeyId": key_id}
|
||||
)
|
||||
assert {
|
||||
"AliasName": alias,
|
||||
"AliasArn": alias_arn,
|
||||
"TargetKeyId": key_id,
|
||||
} in aliases
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -344,9 +346,9 @@ def test_list_keys():
|
||||
k2 = client.create_key(Description="key2")["KeyMetadata"]
|
||||
|
||||
keys = client.list_keys()["Keys"]
|
||||
keys.should.have.length_of(2)
|
||||
keys.should.contain({"KeyId": k1["KeyId"], "KeyArn": k1["Arn"]})
|
||||
keys.should.contain({"KeyId": k2["KeyId"], "KeyArn": k2["Arn"]})
|
||||
assert len(keys) == 2
|
||||
assert {"KeyId": k1["KeyId"], "KeyArn": k1["Arn"]} in keys
|
||||
assert {"KeyId": k2["KeyId"], "KeyArn": k2["Arn"]} in keys
|
||||
|
||||
|
||||
@pytest.mark.parametrize("id_or_arn", ["KeyId", "Arn"])
|
||||
@ -355,19 +357,13 @@ def test_enable_key_rotation(id_or_arn):
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key_id = create_simple_key(client, id_or_arn=id_or_arn)
|
||||
|
||||
client.get_key_rotation_status(KeyId=key_id)["KeyRotationEnabled"].should.equal(
|
||||
False
|
||||
)
|
||||
assert client.get_key_rotation_status(KeyId=key_id)["KeyRotationEnabled"] is False
|
||||
|
||||
client.enable_key_rotation(KeyId=key_id)
|
||||
client.get_key_rotation_status(KeyId=key_id)["KeyRotationEnabled"].should.equal(
|
||||
True
|
||||
)
|
||||
assert client.get_key_rotation_status(KeyId=key_id)["KeyRotationEnabled"] is True
|
||||
|
||||
client.disable_key_rotation(KeyId=key_id)
|
||||
client.get_key_rotation_status(KeyId=key_id)["KeyRotationEnabled"].should.equal(
|
||||
False
|
||||
)
|
||||
assert client.get_key_rotation_status(KeyId=key_id)["KeyRotationEnabled"] is False
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -379,8 +375,8 @@ def test_enable_key_rotation_with_alias_name_should_fail():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.enable_key_rotation(KeyId="alias/my-alias")
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("NotFoundException")
|
||||
err["Message"].should.equal("Invalid keyId alias/my-alias")
|
||||
assert err["Code"] == "NotFoundException"
|
||||
assert err["Message"] == "Invalid keyId alias/my-alias"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -400,7 +396,7 @@ def test_generate_data_key():
|
||||
with pytest.raises(Exception):
|
||||
base64.b64decode(response["Plaintext"], validate=True)
|
||||
|
||||
response["KeyId"].should.equal(key_arn)
|
||||
assert response["KeyId"] == key_arn
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -567,20 +563,20 @@ def test_unknown_tag_methods():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.tag_resource(KeyId="unknown", Tags=[])
|
||||
err = ex.value.response["Error"]
|
||||
err["Message"].should.equal("Invalid keyId unknown")
|
||||
err["Code"].should.equal("NotFoundException")
|
||||
assert err["Message"] == "Invalid keyId unknown"
|
||||
assert err["Code"] == "NotFoundException"
|
||||
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.untag_resource(KeyId="unknown", TagKeys=[])
|
||||
err = ex.value.response["Error"]
|
||||
err["Message"].should.equal("Invalid keyId unknown")
|
||||
err["Code"].should.equal("NotFoundException")
|
||||
assert err["Message"] == "Invalid keyId unknown"
|
||||
assert err["Code"] == "NotFoundException"
|
||||
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.list_resource_tags(KeyId="unknown")
|
||||
err = ex.value.response["Error"]
|
||||
err["Message"].should.equal("Invalid keyId unknown")
|
||||
err["Code"].should.equal("NotFoundException")
|
||||
assert err["Message"] == "Invalid keyId unknown"
|
||||
assert err["Code"] == "NotFoundException"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -599,7 +595,7 @@ def test_list_resource_tags_after_untagging():
|
||||
client.untag_resource(KeyId=key_id, TagKeys=["key2"])
|
||||
|
||||
tags = client.list_resource_tags(KeyId=key_id)["Tags"]
|
||||
tags.should.equal([{"TagKey": "key1", "TagValue": "s1"}])
|
||||
assert tags == [{"TagKey": "key1", "TagValue": "s1"}]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -691,9 +687,7 @@ def test_generate_data_key_all_valid_key_ids(prefix, append_key_id):
|
||||
target_id += key_id
|
||||
|
||||
resp = client.generate_data_key(KeyId=target_id, NumberOfBytes=32)
|
||||
resp.should.have.key("KeyId").equals(
|
||||
f"arn:aws:kms:us-east-1:123456789012:key/{key_id}"
|
||||
)
|
||||
assert resp["KeyId"] == f"arn:aws:kms:us-east-1:123456789012:key/{key_id}"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -715,8 +709,8 @@ def test_generate_random(number_of_bytes):
|
||||
|
||||
response = client.generate_random(NumberOfBytes=number_of_bytes)
|
||||
|
||||
response["Plaintext"].should.be.a(bytes)
|
||||
len(response["Plaintext"]).should.equal(number_of_bytes)
|
||||
assert isinstance(response["Plaintext"], bytes)
|
||||
assert len(response["Plaintext"]) == number_of_bytes
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -828,7 +822,7 @@ def test_get_key_policy(id_or_arn):
|
||||
# PolicyName: Specifies the name of the key policy. The only valid name is default .
|
||||
# But.. why.
|
||||
response = client.get_key_policy(KeyId=key_id, PolicyName="default")
|
||||
response["Policy"].should.equal("my awesome key policy")
|
||||
assert response["Policy"] == "my awesome key policy"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("id_or_arn", ["KeyId", "Arn"])
|
||||
@ -840,7 +834,7 @@ def test_put_key_policy(id_or_arn):
|
||||
client.put_key_policy(KeyId=key_id, PolicyName="default", Policy="policy 2.0")
|
||||
|
||||
response = client.get_key_policy(KeyId=key_id, PolicyName="default")
|
||||
response["Policy"].should.equal("policy 2.0")
|
||||
assert response["Policy"] == "policy 2.0"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -854,11 +848,11 @@ def test_put_key_policy_using_alias_shouldnt_work():
|
||||
KeyId="alias/my-alias", PolicyName="default", Policy="policy 2.0"
|
||||
)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("NotFoundException")
|
||||
err["Message"].should.equal("Invalid keyId alias/my-alias")
|
||||
assert err["Code"] == "NotFoundException"
|
||||
assert err["Message"] == "Invalid keyId alias/my-alias"
|
||||
|
||||
response = client.get_key_policy(KeyId=key_id, PolicyName="default")
|
||||
response["Policy"].should.equal("my policy")
|
||||
assert response["Policy"] == "my policy"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -867,7 +861,7 @@ def test_list_key_policies():
|
||||
key_id = create_simple_key(client)
|
||||
|
||||
policies = client.list_key_policies(KeyId=key_id)
|
||||
policies["PolicyNames"].should.equal(["default"])
|
||||
assert policies["PolicyNames"] == ["default"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -882,8 +876,8 @@ def test__create_alias__raises_if_reserved_alias(reserved_alias):
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_alias(AliasName=reserved_alias, TargetKeyId=key_id)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("NotAuthorizedException")
|
||||
err["Message"].should.equal("")
|
||||
assert err["Code"] == "NotAuthorizedException"
|
||||
assert err["Message"] == ""
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -897,9 +891,10 @@ def test__create_alias__raises_if_alias_has_restricted_characters(name):
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_alias(AliasName=name, TargetKeyId=key_id)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
f"1 validation error detected: Value '{name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== f"1 validation error detected: Value '{name}' at 'aliasName' failed to satisfy constraint: Member must satisfy regular expression pattern: ^[a-zA-Z0-9:/_-]+$"
|
||||
)
|
||||
|
||||
|
||||
@ -912,10 +907,8 @@ def test__create_alias__raises_if_alias_has_restricted_characters_semicolon():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_alias(AliasName="alias/my:alias", TargetKeyId=key_id)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"alias/my:alias contains invalid characters for an alias"
|
||||
)
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert err["Message"] == "alias/my:alias contains invalid characters for an alias"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", ["alias/my-alias_/", "alias/my_alias-/"])
|
||||
@ -938,8 +931,8 @@ def test__create_alias__raises_if_target_key_id_is_existing_alias():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_alias(AliasName=name, TargetKeyId=name)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal("Aliases must refer to keys. Not aliases")
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert err["Message"] == "Aliases must refer to keys. Not aliases"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -950,8 +943,8 @@ def test__create_alias__raises_if_wrong_prefix():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_alias(AliasName="wrongprefix/my-alias", TargetKeyId=key_id)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal("Invalid identifier")
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert err["Message"] == "Invalid identifier"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -965,9 +958,10 @@ def test__create_alias__raises_if_duplicate():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.create_alias(AliasName=alias, TargetKeyId=key_id)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("AlreadyExistsException")
|
||||
err["Message"].should.equal(
|
||||
f"An alias with the name arn:aws:kms:us-east-1:{ACCOUNT_ID}:alias/my-alias already exists"
|
||||
assert err["Code"] == "AlreadyExistsException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== f"An alias with the name arn:aws:kms:us-east-1:{ACCOUNT_ID}:alias/my-alias already exists"
|
||||
)
|
||||
|
||||
|
||||
@ -994,8 +988,8 @@ def test__delete_alias__raises_if_wrong_prefix():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.delete_alias(AliasName="wrongprefix/my-alias")
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal("Invalid identifier")
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert err["Message"] == "Invalid identifier"
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -1005,9 +999,10 @@ def test__delete_alias__raises_if_alias_is_not_found():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.delete_alias(AliasName="alias/unknown-alias")
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("NotFoundException")
|
||||
err["Message"].should.equal(
|
||||
f"Alias arn:aws:kms:us-east-1:{ACCOUNT_ID}:alias/unknown-alias is not found."
|
||||
assert err["Code"] == "NotFoundException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== f"Alias arn:aws:kms:us-east-1:{ACCOUNT_ID}:alias/unknown-alias is not found."
|
||||
)
|
||||
|
||||
|
||||
@ -1091,9 +1086,9 @@ def test_sign_happy(plaintext):
|
||||
KeyId=key_id, Message=plaintext, SigningAlgorithm=signing_algorithm
|
||||
)
|
||||
|
||||
sign_response["Signature"].should_not.equal(plaintext)
|
||||
sign_response["SigningAlgorithm"].should.equal(signing_algorithm)
|
||||
sign_response["KeyId"].should.equal(key_arn)
|
||||
assert sign_response["Signature"] != plaintext
|
||||
assert sign_response["SigningAlgorithm"] == signing_algorithm
|
||||
assert sign_response["KeyId"] == key_arn
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -1109,9 +1104,10 @@ def test_sign_invalid_signing_algorithm():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value 'INVALID' at 'SigningAlgorithm' failed to satisfy constraint: Member must satisfy enum value set: ['RSASSA_PKCS1_V1_5_SHA_256', 'RSASSA_PKCS1_V1_5_SHA_384', 'RSASSA_PKCS1_V1_5_SHA_512', 'RSASSA_PSS_SHA_256', 'RSASSA_PSS_SHA_384', 'RSASSA_PSS_SHA_512']"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== "1 validation error detected: Value 'INVALID' at 'SigningAlgorithm' failed to satisfy constraint: Member must satisfy enum value set: ['RSASSA_PKCS1_V1_5_SHA_256', 'RSASSA_PKCS1_V1_5_SHA_384', 'RSASSA_PKCS1_V1_5_SHA_512', 'RSASSA_PSS_SHA_256', 'RSASSA_PSS_SHA_384', 'RSASSA_PSS_SHA_512']"
|
||||
)
|
||||
|
||||
|
||||
@ -1132,7 +1128,7 @@ def test_sign_and_verify_ignoring_grant_tokens():
|
||||
GrantTokens=["my-ignored-grant-token"],
|
||||
)
|
||||
|
||||
sign_response["Signature"].should_not.equal(message)
|
||||
assert sign_response["Signature"] != message
|
||||
|
||||
verify_response = client.verify(
|
||||
KeyId=key_id,
|
||||
@ -1142,7 +1138,7 @@ def test_sign_and_verify_ignoring_grant_tokens():
|
||||
GrantTokens=["my-ignored-grant-token"],
|
||||
)
|
||||
|
||||
verify_response["SignatureValid"].should.equal(True)
|
||||
assert verify_response["SignatureValid"] is True
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -1172,7 +1168,7 @@ def test_sign_and_verify_digest_message_type_256():
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
)
|
||||
|
||||
verify_response["SignatureValid"].should.equal(True)
|
||||
assert verify_response["SignatureValid"] is True
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -1188,9 +1184,10 @@ def test_sign_invalid_key_usage():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
f"1 validation error detected: Value '{key_id}' at 'KeyId' failed to satisfy constraint: Member must point to a key with usage: 'SIGN_VERIFY'"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== f"1 validation error detected: Value '{key_id}' at 'KeyId' failed to satisfy constraint: Member must point to a key with usage: 'SIGN_VERIFY'"
|
||||
)
|
||||
|
||||
|
||||
@ -1207,9 +1204,10 @@ def test_sign_invalid_message():
|
||||
with pytest.raises(ClientError) as ex:
|
||||
client.sign(KeyId=key_id, Message=message, SigningAlgorithm=signing_algorithm)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== "1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
)
|
||||
|
||||
|
||||
@ -1236,9 +1234,9 @@ def test_verify_happy(plaintext):
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
)
|
||||
|
||||
verify_response["SigningAlgorithm"].should.equal(signing_algorithm)
|
||||
verify_response["KeyId"].should.equal(key_arn)
|
||||
verify_response["SignatureValid"].should.equal(True)
|
||||
assert verify_response["SigningAlgorithm"] == signing_algorithm
|
||||
assert verify_response["KeyId"] == key_arn
|
||||
assert verify_response["SignatureValid"] is True
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -1257,9 +1255,9 @@ def test_verify_happy_with_invalid_signature():
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
)
|
||||
|
||||
verify_response["SigningAlgorithm"].should.equal(signing_algorithm)
|
||||
verify_response["KeyId"].should.equal(key_arn)
|
||||
verify_response["SignatureValid"].should.equal(False)
|
||||
assert verify_response["SigningAlgorithm"] == signing_algorithm
|
||||
assert verify_response["KeyId"] == key_arn
|
||||
assert verify_response["SignatureValid"] is False
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -1281,9 +1279,10 @@ def test_verify_invalid_signing_algorithm():
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value 'INVALID' at 'SigningAlgorithm' failed to satisfy constraint: Member must satisfy enum value set: ['RSASSA_PKCS1_V1_5_SHA_256', 'RSASSA_PKCS1_V1_5_SHA_384', 'RSASSA_PKCS1_V1_5_SHA_512', 'RSASSA_PSS_SHA_256', 'RSASSA_PSS_SHA_384', 'RSASSA_PSS_SHA_512']"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== "1 validation error detected: Value 'INVALID' at 'SigningAlgorithm' failed to satisfy constraint: Member must satisfy enum value set: ['RSASSA_PKCS1_V1_5_SHA_256', 'RSASSA_PKCS1_V1_5_SHA_384', 'RSASSA_PKCS1_V1_5_SHA_512', 'RSASSA_PSS_SHA_256', 'RSASSA_PSS_SHA_384', 'RSASSA_PSS_SHA_512']"
|
||||
)
|
||||
|
||||
|
||||
@ -1304,9 +1303,10 @@ def test_verify_invalid_message():
|
||||
)
|
||||
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== "1 validation error detected: Value at 'Message' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
)
|
||||
|
||||
|
||||
@ -1329,9 +1329,10 @@ def test_verify_empty_signature():
|
||||
SigningAlgorithm=signing_algorithm,
|
||||
)
|
||||
err = ex.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value at 'Signature' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== "1 validation error detected: Value at 'Signature' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
)
|
||||
|
||||
|
||||
@ -1342,11 +1343,12 @@ def test_get_public_key():
|
||||
key_id = key["KeyMetadata"]["KeyId"]
|
||||
public_key_response = client.get_public_key(KeyId=key_id)
|
||||
|
||||
public_key_response.should.contain("PublicKey")
|
||||
public_key_response["SigningAlgorithms"].should.equal(
|
||||
key["KeyMetadata"]["SigningAlgorithms"]
|
||||
assert "PublicKey" in public_key_response
|
||||
assert (
|
||||
public_key_response["SigningAlgorithms"]
|
||||
== key["KeyMetadata"]["SigningAlgorithms"]
|
||||
)
|
||||
public_key_response.shouldnt.contain("EncryptionAlgorithms")
|
||||
assert "EncryptionAlgorithms" not in public_key_response
|
||||
|
||||
|
||||
def create_simple_key(client, id_or_arn="KeyId", description=None, policy=None):
|
||||
|
@ -1,7 +1,6 @@
|
||||
import base64
|
||||
|
||||
import boto3
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
from botocore.exceptions import ClientError
|
||||
import pytest
|
||||
|
||||
@ -16,9 +15,10 @@ def test_create_key_with_empty_content():
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client_kms.encrypt(KeyId=metadata["KeyId"], Plaintext="")
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("ValidationException")
|
||||
err["Message"].should.equal(
|
||||
"1 validation error detected: Value at 'plaintext' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
err["Message"]
|
||||
== "1 validation error detected: Value at 'plaintext' failed to satisfy constraint: Member must have length greater than or equal to 1"
|
||||
)
|
||||
|
||||
|
||||
@ -32,13 +32,13 @@ def test_encrypt(plaintext):
|
||||
key_arn = key["KeyMetadata"]["Arn"]
|
||||
|
||||
response = client.encrypt(KeyId=key_id, Plaintext=plaintext)
|
||||
response["CiphertextBlob"].should_not.equal(plaintext)
|
||||
assert response["CiphertextBlob"] != plaintext
|
||||
|
||||
# CiphertextBlob must NOT be base64-encoded
|
||||
with pytest.raises(Exception):
|
||||
base64.b64decode(response["CiphertextBlob"], validate=True)
|
||||
|
||||
response["KeyId"].should.equal(key_arn)
|
||||
assert response["KeyId"] == key_arn
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -116,8 +116,8 @@ def test_decrypt(plaintext):
|
||||
with pytest.raises(Exception):
|
||||
base64.b64decode(decrypt_response["Plaintext"], validate=True)
|
||||
|
||||
decrypt_response["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response["KeyId"].should.equal(key_arn)
|
||||
assert decrypt_response["Plaintext"] == _get_encoded_value(plaintext)
|
||||
assert decrypt_response["KeyId"] == key_arn
|
||||
|
||||
|
||||
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
|
||||
@ -128,7 +128,7 @@ def test_kms_encrypt(plaintext):
|
||||
response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext)
|
||||
|
||||
response = client.decrypt(CiphertextBlob=response["CiphertextBlob"])
|
||||
response["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
assert response["Plaintext"] == _get_encoded_value(plaintext)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("plaintext", PLAINTEXT_VECTORS)
|
||||
@ -158,24 +158,24 @@ def test_re_encrypt_decrypt(plaintext):
|
||||
with pytest.raises(Exception):
|
||||
base64.b64decode(re_encrypt_response["CiphertextBlob"], validate=True)
|
||||
|
||||
re_encrypt_response["SourceKeyId"].should.equal(key_1_arn)
|
||||
re_encrypt_response["KeyId"].should.equal(key_2_arn)
|
||||
assert re_encrypt_response["SourceKeyId"] == key_1_arn
|
||||
assert re_encrypt_response["KeyId"] == key_2_arn
|
||||
|
||||
decrypt_response_1 = client.decrypt(
|
||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||
EncryptionContext={"encryption": "context"},
|
||||
)
|
||||
decrypt_response_1["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response_1["KeyId"].should.equal(key_1_arn)
|
||||
assert decrypt_response_1["Plaintext"] == _get_encoded_value(plaintext)
|
||||
assert decrypt_response_1["KeyId"] == key_1_arn
|
||||
|
||||
decrypt_response_2 = client.decrypt(
|
||||
CiphertextBlob=re_encrypt_response["CiphertextBlob"],
|
||||
EncryptionContext={"another": "context"},
|
||||
)
|
||||
decrypt_response_2["Plaintext"].should.equal(_get_encoded_value(plaintext))
|
||||
decrypt_response_2["KeyId"].should.equal(key_2_arn)
|
||||
assert decrypt_response_2["Plaintext"] == _get_encoded_value(plaintext)
|
||||
assert decrypt_response_2["KeyId"] == key_2_arn
|
||||
|
||||
decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"])
|
||||
assert decrypt_response_1["Plaintext"] == decrypt_response_2["Plaintext"]
|
||||
|
||||
|
||||
@mock_kms
|
||||
|
@ -1,5 +1,4 @@
|
||||
import boto3
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
import pytest
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from unittest import mock
|
||||
@ -24,8 +23,8 @@ def test_create_grant():
|
||||
Operations=["DECRYPT"],
|
||||
Name="testgrant",
|
||||
)
|
||||
resp.should.have.key("GrantId")
|
||||
resp.should.have.key("GrantToken")
|
||||
assert "GrantId" in resp
|
||||
assert "GrantToken" in resp
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -33,7 +32,7 @@ def test_list_grants():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key_id = create_key(client)
|
||||
|
||||
client.list_grants(KeyId=key_id).should.have.key("Grants").equals([])
|
||||
assert client.list_grants(KeyId=key_id)["Grants"] == []
|
||||
|
||||
grant_id1 = client.create_grant(
|
||||
KeyId=key_id,
|
||||
@ -51,33 +50,33 @@ def test_list_grants():
|
||||
|
||||
# List all
|
||||
grants = client.list_grants(KeyId=key_id)["Grants"]
|
||||
grants.should.have.length_of(2)
|
||||
assert len(grants) == 2
|
||||
grant_1 = [grant for grant in grants if grant["GrantId"] == grant_id1][0]
|
||||
grant_2 = [grant for grant in grants if grant["GrantId"] == grant_id2][0]
|
||||
|
||||
grant_1.should.have.key("KeyId").equals(key_id)
|
||||
grant_1.should.have.key("GrantId").equals(grant_id1)
|
||||
grant_1.should.have.key("Name").equals("testgrant")
|
||||
grant_1.should.have.key("GranteePrincipal").equals(grantee_principal)
|
||||
grant_1.should.have.key("Operations").equals(["DECRYPT"])
|
||||
assert grant_1["KeyId"] == key_id
|
||||
assert grant_1["GrantId"] == grant_id1
|
||||
assert grant_1["Name"] == "testgrant"
|
||||
assert grant_1["GranteePrincipal"] == grantee_principal
|
||||
assert grant_1["Operations"] == ["DECRYPT"]
|
||||
|
||||
grant_2.should.have.key("KeyId").equals(key_id)
|
||||
grant_2.should.have.key("GrantId").equals(grant_id2)
|
||||
grant_2.shouldnt.have.key("Name")
|
||||
grant_2.should.have.key("GranteePrincipal").equals(grantee_principal)
|
||||
grant_2.should.have.key("Operations").equals(["DECRYPT", "ENCRYPT"])
|
||||
grant_2.should.have.key("Constraints").equals(
|
||||
{"EncryptionContextSubset": {"baz": "kaz", "foo": "bar"}}
|
||||
)
|
||||
assert grant_2["KeyId"] == key_id
|
||||
assert grant_2["GrantId"] == grant_id2
|
||||
assert "Name" not in grant_2
|
||||
assert grant_2["GranteePrincipal"] == grantee_principal
|
||||
assert grant_2["Operations"] == ["DECRYPT", "ENCRYPT"]
|
||||
assert grant_2["Constraints"] == {
|
||||
"EncryptionContextSubset": {"baz": "kaz", "foo": "bar"}
|
||||
}
|
||||
|
||||
# List by grant_id
|
||||
grants = client.list_grants(KeyId=key_id, GrantId=grant_id2)["Grants"]
|
||||
grants.should.have.length_of(1)
|
||||
grants[0]["GrantId"].should.equal(grant_id2)
|
||||
assert len(grants) == 1
|
||||
assert grants[0]["GrantId"] == grant_id2
|
||||
|
||||
# List by unknown grant_id
|
||||
grants = client.list_grants(KeyId=key_id, GrantId="unknown")["Grants"]
|
||||
grants.should.have.length_of(0)
|
||||
assert len(grants) == 0
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -114,9 +113,9 @@ def test_list_retirable_grants():
|
||||
|
||||
# List only the grants from the retiring principal
|
||||
grants = client.list_retirable_grants(RetiringPrincipal="principal")["Grants"]
|
||||
grants.should.have.length_of(1)
|
||||
grants[0]["KeyId"].should.equal(key_id2)
|
||||
grants[0]["GrantId"].should.equal(grant2_key2)
|
||||
assert len(grants) == 1
|
||||
assert grants[0]["KeyId"] == key_id2
|
||||
assert grants[0]["GrantId"] == grant2_key2
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -125,7 +124,7 @@ def test_revoke_grant():
|
||||
client = boto3.client("kms", region_name="us-east-1")
|
||||
key_id = create_key(client)
|
||||
|
||||
client.list_grants(KeyId=key_id).should.have.key("Grants").equals([])
|
||||
assert client.list_grants(KeyId=key_id)["Grants"] == []
|
||||
|
||||
grant_id = client.create_grant(
|
||||
KeyId=key_id,
|
||||
@ -136,7 +135,7 @@ def test_revoke_grant():
|
||||
|
||||
client.revoke_grant(KeyId=key_id, GrantId=grant_id)
|
||||
|
||||
client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(0)
|
||||
assert len(client.list_grants(KeyId=key_id)["Grants"]) == 0
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -148,9 +147,10 @@ def test_revoke_grant_raises_when_grant_does_not_exist():
|
||||
with pytest.raises(client.exceptions.NotFoundException) as ex:
|
||||
client.revoke_grant(KeyId=key_id, GrantId=not_existent_grant_id)
|
||||
|
||||
ex.value.response["Error"]["Code"].should.equal("NotFoundException")
|
||||
ex.value.response["Error"]["Message"].should.equal(
|
||||
f"Grant ID {not_existent_grant_id} not found"
|
||||
assert ex.value.response["Error"]["Code"] == "NotFoundException"
|
||||
assert (
|
||||
ex.value.response["Error"]["Message"]
|
||||
== f"Grant ID {not_existent_grant_id} not found"
|
||||
)
|
||||
|
||||
|
||||
@ -170,7 +170,7 @@ def test_retire_grant_by_token():
|
||||
|
||||
client.retire_grant(GrantToken=grant_token)
|
||||
|
||||
client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(2)
|
||||
assert len(client.list_grants(KeyId=key_id)["Grants"]) == 2
|
||||
|
||||
|
||||
@mock_kms
|
||||
@ -189,7 +189,7 @@ def test_retire_grant_by_grant_id():
|
||||
|
||||
client.retire_grant(KeyId=key_id, GrantId=grant_id)
|
||||
|
||||
client.list_grants(KeyId=key_id)["Grants"].should.have.length_of(2)
|
||||
assert len(client.list_grants(KeyId=key_id)["Grants"]) == 2
|
||||
|
||||
|
||||
def create_key(client):
|
||||
|
@ -46,7 +46,7 @@ class TestKMSPolicyEnforcement:
|
||||
with pytest.raises(ClientError) as exc:
|
||||
self.client.describe_key(KeyId=self.key_id)
|
||||
err = exc.value.response["Error"]
|
||||
err["Code"].should.equal("AccessDeniedException")
|
||||
assert err["Code"] == "AccessDeniedException"
|
||||
|
||||
@pytest.mark.parametrize("actions", [["kms:unknown"], ["kms:describestuff"]])
|
||||
def test_policy__allow_based_on_actions(self, actions):
|
||||
@ -67,7 +67,7 @@ class TestKMSPolicyEnforcement:
|
||||
Policy=json.dumps(policy), PolicyName="default", KeyId=self.key_id
|
||||
)
|
||||
key = self.client.describe_key(KeyId=self.key_id)["KeyMetadata"]
|
||||
key["Description"].should.equal("t")
|
||||
assert key["Description"] == "t"
|
||||
|
||||
|
||||
class TestKMSPolicyValidator:
|
||||
|
@ -1,5 +1,4 @@
|
||||
import json
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
|
||||
import moto.server as server
|
||||
from moto import mock_kms
|
||||
@ -15,7 +14,6 @@ def test_list_keys():
|
||||
test_client = backend.test_client()
|
||||
|
||||
res = test_client.get("/?Action=ListKeys")
|
||||
body = json.loads(res.data.decode("utf-8"))
|
||||
|
||||
json.loads(res.data.decode("utf-8")).should.equal(
|
||||
{"Keys": [], "NextMarker": None, "Truncated": False}
|
||||
)
|
||||
assert body == {"Keys": [], "NextMarker": None, "Truncated": False}
|
||||
|
@ -1,4 +1,3 @@
|
||||
import sure # noqa # pylint: disable=unused-import
|
||||
import pytest
|
||||
|
||||
from moto.kms.exceptions import (
|
||||
@ -60,40 +59,40 @@ CIPHERTEXT_BLOB_VECTORS = [
|
||||
def test_generate_data_key():
|
||||
test = generate_data_key(123)
|
||||
|
||||
test.should.be.a(bytes)
|
||||
len(test).should.equal(123)
|
||||
assert isinstance(test, bytes)
|
||||
assert len(test) == 123
|
||||
|
||||
|
||||
def test_generate_master_key():
|
||||
test = generate_master_key()
|
||||
|
||||
test.should.be.a(bytes)
|
||||
len(test).should.equal(MASTER_KEY_LEN)
|
||||
assert isinstance(test, bytes)
|
||||
assert len(test) == MASTER_KEY_LEN
|
||||
|
||||
|
||||
@pytest.mark.parametrize("raw,serialized", ENCRYPTION_CONTEXT_VECTORS)
|
||||
def test_serialize_encryption_context(raw, serialized):
|
||||
test = _serialize_encryption_context(raw)
|
||||
test.should.equal(serialized)
|
||||
assert test == serialized
|
||||
|
||||
|
||||
@pytest.mark.parametrize("raw,_serialized", CIPHERTEXT_BLOB_VECTORS)
|
||||
def test_cycle_ciphertext_blob(raw, _serialized):
|
||||
test_serialized = _serialize_ciphertext_blob(raw)
|
||||
test_deserialized = _deserialize_ciphertext_blob(test_serialized)
|
||||
test_deserialized.should.equal(raw)
|
||||
assert test_deserialized == raw
|
||||
|
||||
|
||||
@pytest.mark.parametrize("raw,serialized", CIPHERTEXT_BLOB_VECTORS)
|
||||
def test_serialize_ciphertext_blob(raw, serialized):
|
||||
test = _serialize_ciphertext_blob(raw)
|
||||
test.should.equal(serialized)
|
||||
assert test == serialized
|
||||
|
||||
|
||||
@pytest.mark.parametrize("raw,serialized", CIPHERTEXT_BLOB_VECTORS)
|
||||
def test_deserialize_ciphertext_blob(raw, serialized):
|
||||
test = _deserialize_ciphertext_blob(serialized)
|
||||
test.should.equal(raw)
|
||||
assert test == raw
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -110,15 +109,15 @@ def test_encrypt_decrypt_cycle(encryption_context):
|
||||
plaintext=plaintext,
|
||||
encryption_context=encryption_context,
|
||||
)
|
||||
ciphertext_blob.should_not.equal(plaintext)
|
||||
assert ciphertext_blob != plaintext
|
||||
|
||||
decrypted, decrypting_key_id = decrypt(
|
||||
master_keys=master_key_map,
|
||||
ciphertext_blob=ciphertext_blob,
|
||||
encryption_context=encryption_context,
|
||||
)
|
||||
decrypted.should.equal(plaintext)
|
||||
decrypting_key_id.should.equal(master_key.id)
|
||||
assert decrypted == plaintext
|
||||
assert decrypting_key_id == master_key.id
|
||||
|
||||
|
||||
def test_encrypt_unknown_key_id():
|
||||
|
Loading…
Reference in New Issue
Block a user