Merge pull request #732 from JackDanger/jack/implement-kms-encryption
Implementing KMS encrypt/decrypt
This commit is contained in:
commit
5dbb22265c
@ -1,5 +1,6 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import base64
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
|
||||||
@ -213,6 +214,19 @@ class KmsResponse(BaseResponse):
|
|||||||
|
|
||||||
return json.dumps({'Truncated': False, 'PolicyNames': ['default']})
|
return json.dumps({'Truncated': False, 'PolicyNames': ['default']})
|
||||||
|
|
||||||
|
def encrypt(self):
|
||||||
|
"""
|
||||||
|
We perform no encryption, we just encode the value as base64 and then
|
||||||
|
decode it in decrypt().
|
||||||
|
"""
|
||||||
|
value = self.parameters.get("Plaintext")
|
||||||
|
return json.dumps({"CiphertextBlob": base64.b64encode(value).encode("utf-8")})
|
||||||
|
|
||||||
|
def decrypt(self):
|
||||||
|
value = self.parameters.get("CiphertextBlob")
|
||||||
|
return json.dumps({"Plaintext": base64.b64decode(value).encode("utf-8")})
|
||||||
|
|
||||||
|
|
||||||
def _assert_valid_key_id(key_id):
|
def _assert_valid_key_id(key_id):
|
||||||
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
|
if not re.match(r'^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$', key_id, re.IGNORECASE):
|
||||||
raise JSONResponseError(404, 'Not Found', body={'message': ' Invalid keyId', '__type': 'NotFoundException'})
|
raise JSONResponseError(404, 'Not Found', body={'message': ' Invalid keyId', '__type': 'NotFoundException'})
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
import re
|
import re
|
||||||
|
import six
|
||||||
|
|
||||||
import boto.kms
|
import boto.kms
|
||||||
from boto.exception import JSONResponseError
|
from boto.exception import JSONResponseError
|
||||||
@ -136,6 +137,27 @@ def test_disable_key_rotation():
|
|||||||
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(False)
|
conn.get_key_rotation_status(key_id)['KeyRotationEnabled'].should.equal(False)
|
||||||
|
|
||||||
|
|
||||||
|
# Scoping encryption/decryption to only Python 2 because our test suite
|
||||||
|
# hardcodes a dependency on boto version 2.36.0 which is not compatible with
|
||||||
|
# Python 3 (2.40+, however, passes these tests).
|
||||||
|
if six.PY2:
|
||||||
|
@mock_kms
|
||||||
|
def test_encrypt():
|
||||||
|
"""
|
||||||
|
Using base64 encoding to merely test that the endpoint was called
|
||||||
|
"""
|
||||||
|
conn = boto.kms.connect_to_region("us-west-2")
|
||||||
|
response = conn.encrypt('key_id', 'encryptme'.encode('utf-8'))
|
||||||
|
response['CiphertextBlob'].should.equal('ZW5jcnlwdG1l')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_decrypt():
|
||||||
|
conn = boto.kms.connect_to_region('us-west-2')
|
||||||
|
response = conn.decrypt('ZW5jcnlwdG1l'.encode('utf-8'))
|
||||||
|
response['Plaintext'].should.equal('encryptme')
|
||||||
|
|
||||||
|
|
||||||
@mock_kms
|
@mock_kms
|
||||||
def test_disable_key_rotation_with_missing_key():
|
def test_disable_key_rotation_with_missing_key():
|
||||||
conn = boto.kms.connect_to_region("us-west-2")
|
conn = boto.kms.connect_to_region("us-west-2")
|
||||||
|
Loading…
Reference in New Issue
Block a user