convert tests from boto to boto3 and add unicode plaintext vector to test auto-conversion
This commit is contained in:
parent
c44178f2f7
commit
24832982d4
@ -23,6 +23,7 @@ from moto import mock_kms, mock_kms_deprecated
|
|||||||
PLAINTEXT_VECTORS = (
|
PLAINTEXT_VECTORS = (
|
||||||
(b"some encodeable plaintext",),
|
(b"some encodeable plaintext",),
|
||||||
(b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",),
|
(b"some unencodeable plaintext \xec\x8a\xcf\xb6r\xe9\xb5\xeb\xff\xa23\x16",),
|
||||||
|
(u"some unicode characters ø˚∆øˆˆ∆ßçøˆˆçßøˆ¨¥",),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -215,15 +216,15 @@ def test_boto3_generate_data_key():
|
|||||||
|
|
||||||
|
|
||||||
@parameterized(PLAINTEXT_VECTORS)
|
@parameterized(PLAINTEXT_VECTORS)
|
||||||
@mock_kms_deprecated
|
@mock_kms
|
||||||
def test_encrypt(plaintext):
|
def test_encrypt(plaintext):
|
||||||
conn = boto.kms.connect_to_region("us-west-2")
|
client = boto3.client("kms", region_name="us-west-2")
|
||||||
|
|
||||||
key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT")
|
key = client.create_key(Description="key")
|
||||||
key_id = key["KeyMetadata"]["KeyId"]
|
key_id = key["KeyMetadata"]["KeyId"]
|
||||||
key_arn = key["KeyMetadata"]["Arn"]
|
key_arn = key["KeyMetadata"]["Arn"]
|
||||||
|
|
||||||
response = conn.encrypt(key_id, plaintext)
|
response = client.encrypt(KeyId=key_id, Plaintext=plaintext)
|
||||||
response["CiphertextBlob"].should_not.equal(plaintext)
|
response["CiphertextBlob"].should_not.equal(plaintext)
|
||||||
|
|
||||||
# CiphertextBlob must NOT be base64-encoded
|
# CiphertextBlob must NOT be base64-encoded
|
||||||
@ -234,27 +235,33 @@ def test_encrypt(plaintext):
|
|||||||
|
|
||||||
|
|
||||||
@parameterized(PLAINTEXT_VECTORS)
|
@parameterized(PLAINTEXT_VECTORS)
|
||||||
@mock_kms_deprecated
|
@mock_kms
|
||||||
def test_decrypt(plaintext):
|
def test_decrypt(plaintext):
|
||||||
conn = boto.kms.connect_to_region("us-west-2")
|
client = boto3.client("kms", region_name="us-west-2")
|
||||||
|
|
||||||
key = conn.create_key(policy="my policy", description="my key", key_usage="ENCRYPT_DECRYPT")
|
key = client.create_key(Description="key")
|
||||||
key_id = key["KeyMetadata"]["KeyId"]
|
key_id = key["KeyMetadata"]["KeyId"]
|
||||||
key_arn = key["KeyMetadata"]["Arn"]
|
key_arn = key["KeyMetadata"]["Arn"]
|
||||||
|
|
||||||
encrypt_response = conn.encrypt(key_id, plaintext)
|
encrypt_response = client.encrypt(KeyId=key_id, Plaintext=plaintext)
|
||||||
|
|
||||||
|
try:
|
||||||
|
encoded_plaintext = plaintext.encode("utf-8")
|
||||||
|
except AttributeError:
|
||||||
|
encoded_plaintext = plaintext
|
||||||
|
|
||||||
|
client.create_key(Description="key")
|
||||||
# CiphertextBlob must NOT be base64-encoded
|
# CiphertextBlob must NOT be base64-encoded
|
||||||
with assert_raises(Exception):
|
with assert_raises(Exception):
|
||||||
base64.b64decode(encrypt_response["CiphertextBlob"], validate=True)
|
base64.b64decode(encrypt_response["CiphertextBlob"], validate=True)
|
||||||
|
|
||||||
decrypt_response = conn.decrypt(encrypt_response["CiphertextBlob"])
|
decrypt_response = client.decrypt(CiphertextBlob=encrypt_response["CiphertextBlob"])
|
||||||
|
|
||||||
# Plaintext must NOT be base64-encoded
|
# Plaintext must NOT be base64-encoded
|
||||||
with assert_raises(Exception):
|
with assert_raises(Exception):
|
||||||
base64.b64decode(decrypt_response["Plaintext"], validate=True)
|
base64.b64decode(decrypt_response["Plaintext"], validate=True)
|
||||||
|
|
||||||
decrypt_response["Plaintext"].should.equal(plaintext)
|
decrypt_response["Plaintext"].should.equal(encoded_plaintext)
|
||||||
decrypt_response["KeyId"].should.equal(key_arn)
|
decrypt_response["KeyId"].should.equal(key_arn)
|
||||||
|
|
||||||
|
|
||||||
@ -682,8 +689,13 @@ def test_kms_encrypt_boto3(plaintext):
|
|||||||
key = client.create_key(Description="key")
|
key = client.create_key(Description="key")
|
||||||
response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext)
|
response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext)
|
||||||
|
|
||||||
|
try:
|
||||||
|
encoded_plaintext = plaintext.encode("utf-8")
|
||||||
|
except AttributeError:
|
||||||
|
encoded_plaintext = plaintext
|
||||||
|
|
||||||
response = client.decrypt(CiphertextBlob=response["CiphertextBlob"])
|
response = client.decrypt(CiphertextBlob=response["CiphertextBlob"])
|
||||||
response["Plaintext"].should.equal(plaintext)
|
response["Plaintext"].should.equal(encoded_plaintext)
|
||||||
|
|
||||||
|
|
||||||
@mock_kms
|
@mock_kms
|
||||||
@ -906,6 +918,11 @@ def test_re_encrypt_decrypt(plaintext):
|
|||||||
EncryptionContext={"encryption": "context"},
|
EncryptionContext={"encryption": "context"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
encoded_plaintext = plaintext.encode("utf-8")
|
||||||
|
except AttributeError:
|
||||||
|
encoded_plaintext = plaintext
|
||||||
|
|
||||||
re_encrypt_response = client.re_encrypt(
|
re_encrypt_response = client.re_encrypt(
|
||||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||||
SourceEncryptionContext={"encryption": "context"},
|
SourceEncryptionContext={"encryption": "context"},
|
||||||
@ -924,14 +941,14 @@ def test_re_encrypt_decrypt(plaintext):
|
|||||||
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
CiphertextBlob=encrypt_response["CiphertextBlob"],
|
||||||
EncryptionContext={"encryption": "context"},
|
EncryptionContext={"encryption": "context"},
|
||||||
)
|
)
|
||||||
decrypt_response_1["Plaintext"].should.equal(plaintext)
|
decrypt_response_1["Plaintext"].should.equal(encoded_plaintext)
|
||||||
decrypt_response_1["KeyId"].should.equal(key_1_arn)
|
decrypt_response_1["KeyId"].should.equal(key_1_arn)
|
||||||
|
|
||||||
decrypt_response_2 = client.decrypt(
|
decrypt_response_2 = client.decrypt(
|
||||||
CiphertextBlob=re_encrypt_response["CiphertextBlob"],
|
CiphertextBlob=re_encrypt_response["CiphertextBlob"],
|
||||||
EncryptionContext={"another": "context"},
|
EncryptionContext={"another": "context"},
|
||||||
)
|
)
|
||||||
decrypt_response_2["Plaintext"].should.equal(plaintext)
|
decrypt_response_2["Plaintext"].should.equal(encoded_plaintext)
|
||||||
decrypt_response_2["KeyId"].should.equal(key_2_arn)
|
decrypt_response_2["KeyId"].should.equal(key_2_arn)
|
||||||
|
|
||||||
decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"])
|
decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"])
|
||||||
|
Loading…
Reference in New Issue
Block a user