From 4c7cdec96542d0d2bddc0e5528612c126a9a2e46 Mon Sep 17 00:00:00 2001 From: mattsb42-aws Date: Fri, 13 Sep 2019 14:08:26 -0700 Subject: [PATCH] fix encoding for Python 2 in KMS tests --- tests/test_kms/test_kms.py | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/tests/test_kms/test_kms.py b/tests/test_kms/test_kms.py index 150dfae8f..49c0f886e 100644 --- a/tests/test_kms/test_kms.py +++ b/tests/test_kms/test_kms.py @@ -1,15 +1,16 @@ +# -*- coding: utf-8 -*- from __future__ import unicode_literals from datetime import date from datetime import datetime from dateutil.tz import tzutc import base64 -import binascii import os import re import boto3 import boto.kms import botocore.exceptions +import six import sure # noqa from boto.exception import JSONResponseError from boto.kms.exceptions import AlreadyExistsException, NotFoundException @@ -27,6 +28,13 @@ PLAINTEXT_VECTORS = ( ) +def _get_encoded_value(plaintext): + if isinstance(plaintext, six.binary_type): + return plaintext + + return plaintext.encode("utf-8") + + @mock_kms def test_create_key(): conn = boto3.client("kms", region_name="us-east-1") @@ -245,11 +253,6 @@ def test_decrypt(plaintext): encrypt_response = client.encrypt(KeyId=key_id, Plaintext=plaintext) - try: - encoded_plaintext = plaintext.encode("utf-8") - except AttributeError: - encoded_plaintext = plaintext - client.create_key(Description="key") # CiphertextBlob must NOT be base64-encoded with assert_raises(Exception): @@ -261,7 +264,7 @@ def test_decrypt(plaintext): with assert_raises(Exception): base64.b64decode(decrypt_response["Plaintext"], validate=True) - decrypt_response["Plaintext"].should.equal(encoded_plaintext) + decrypt_response["Plaintext"].should.equal(_get_encoded_value(plaintext)) decrypt_response["KeyId"].should.equal(key_arn) @@ -689,13 +692,8 @@ def test_kms_encrypt_boto3(plaintext): key = client.create_key(Description="key") response = client.encrypt(KeyId=key["KeyMetadata"]["KeyId"], Plaintext=plaintext) - try: - encoded_plaintext = plaintext.encode("utf-8") - except AttributeError: - encoded_plaintext = plaintext - response = client.decrypt(CiphertextBlob=response["CiphertextBlob"]) - response["Plaintext"].should.equal(encoded_plaintext) + response["Plaintext"].should.equal(_get_encoded_value(plaintext)) @mock_kms @@ -918,11 +916,6 @@ def test_re_encrypt_decrypt(plaintext): EncryptionContext={"encryption": "context"}, ) - try: - encoded_plaintext = plaintext.encode("utf-8") - except AttributeError: - encoded_plaintext = plaintext - re_encrypt_response = client.re_encrypt( CiphertextBlob=encrypt_response["CiphertextBlob"], SourceEncryptionContext={"encryption": "context"}, @@ -941,14 +934,14 @@ def test_re_encrypt_decrypt(plaintext): CiphertextBlob=encrypt_response["CiphertextBlob"], EncryptionContext={"encryption": "context"}, ) - decrypt_response_1["Plaintext"].should.equal(encoded_plaintext) + decrypt_response_1["Plaintext"].should.equal(_get_encoded_value(plaintext)) decrypt_response_1["KeyId"].should.equal(key_1_arn) decrypt_response_2 = client.decrypt( CiphertextBlob=re_encrypt_response["CiphertextBlob"], EncryptionContext={"another": "context"}, ) - decrypt_response_2["Plaintext"].should.equal(encoded_plaintext) + decrypt_response_2["Plaintext"].should.equal(_get_encoded_value(plaintext)) decrypt_response_2["KeyId"].should.equal(key_2_arn) decrypt_response_1["Plaintext"].should.equal(decrypt_response_2["Plaintext"])