updates kms to use tagging service and support untag_resource
This commit is contained in:
parent
445f474534
commit
d1efedec29
@ -7,27 +7,18 @@ from datetime import datetime, timedelta
|
|||||||
from boto3 import Session
|
from boto3 import Session
|
||||||
|
|
||||||
from moto.core import BaseBackend, BaseModel
|
from moto.core import BaseBackend, BaseModel
|
||||||
<<<<<<< HEAD
|
|
||||||
from moto.core.exceptions import JsonRESTError
|
|
||||||
from moto.core.utils import iso_8601_datetime_without_milliseconds
|
|
||||||
from moto.utilities.tagging_service import TaggingService
|
|
||||||
=======
|
|
||||||
from moto.core.utils import unix_time
|
from moto.core.utils import unix_time
|
||||||
|
from moto.utilities.tagging_service import TaggingService
|
||||||
|
from moto.core.exceptions import JsonRESTError
|
||||||
from moto.iam.models import ACCOUNT_ID
|
from moto.iam.models import ACCOUNT_ID
|
||||||
|
|
||||||
>>>>>>> 100dbd529f174f18d579a1dcc066d55409f2e38f
|
|
||||||
from .utils import decrypt, encrypt, generate_key_id, generate_master_key
|
from .utils import decrypt, encrypt, generate_key_id, generate_master_key
|
||||||
|
|
||||||
|
|
||||||
class Key(BaseModel):
|
class Key(BaseModel):
|
||||||
<<<<<<< HEAD
|
|
||||||
def __init__(self, policy, key_usage, description, region):
|
|
||||||
=======
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, policy, key_usage, customer_master_key_spec, description, tags, region
|
self, policy, key_usage, customer_master_key_spec, description, region
|
||||||
):
|
):
|
||||||
>>>>>>> 100dbd529f174f18d579a1dcc066d55409f2e38f
|
|
||||||
self.id = generate_key_id()
|
self.id = generate_key_id()
|
||||||
self.creation_date = unix_time()
|
self.creation_date = unix_time()
|
||||||
self.policy = policy
|
self.policy = policy
|
||||||
@ -142,19 +133,14 @@ class KmsBackend(BaseBackend):
|
|||||||
self.key_to_aliases = defaultdict(set)
|
self.key_to_aliases = defaultdict(set)
|
||||||
self.tagger = TaggingService(keyName='TagKey', valueName='TagValue')
|
self.tagger = TaggingService(keyName='TagKey', valueName='TagValue')
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
def create_key(self, policy, key_usage, description, tags, region):
|
|
||||||
key = Key(policy, key_usage, description, region)
|
|
||||||
=======
|
|
||||||
def create_key(
|
def create_key(
|
||||||
self, policy, key_usage, customer_master_key_spec, description, tags, region
|
self, policy, key_usage, customer_master_key_spec, description, tags, region
|
||||||
):
|
):
|
||||||
key = Key(
|
key = Key(
|
||||||
policy, key_usage, customer_master_key_spec, description, tags, region
|
policy, key_usage, customer_master_key_spec, description, region
|
||||||
)
|
)
|
||||||
>>>>>>> 100dbd529f174f18d579a1dcc066d55409f2e38f
|
|
||||||
self.keys[key.id] = key
|
self.keys[key.id] = key
|
||||||
if tags != None and len(tags) > 0:
|
if tags is not None and len(tags) > 0:
|
||||||
self.tag_resource(key.id, tags)
|
self.tag_resource(key.id, tags)
|
||||||
return key
|
return key
|
||||||
|
|
||||||
@ -166,6 +152,7 @@ class KmsBackend(BaseBackend):
|
|||||||
if key_id in self.keys:
|
if key_id in self.keys:
|
||||||
if key_id in self.key_to_aliases:
|
if key_id in self.key_to_aliases:
|
||||||
self.key_to_aliases.pop(key_id)
|
self.key_to_aliases.pop(key_id)
|
||||||
|
self.tagger.delete_all_tags_for_resource(key_id)
|
||||||
|
|
||||||
return self.keys.pop(key_id)
|
return self.keys.pop(key_id)
|
||||||
|
|
||||||
|
@ -680,3 +680,46 @@ def test__assert_default_policy():
|
|||||||
_assert_default_policy.when.called_with("default").should_not.throw(
|
_assert_default_policy.when.called_with("default").should_not.throw(
|
||||||
MotoNotFoundException
|
MotoNotFoundException
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_key_tagging_happy():
|
||||||
|
client = boto3.client("kms", region_name="us-east-1")
|
||||||
|
key = client.create_key(Description="test-key-tagging")
|
||||||
|
key_id = key["KeyMetadata"]["KeyId"]
|
||||||
|
|
||||||
|
tags = [{"TagKey": "key1", "TagValue": "value1"}, {"TagKey": "key2", "TagValue": "value2"}]
|
||||||
|
client.tag_resource(KeyId=key_id, Tags=tags)
|
||||||
|
|
||||||
|
result = client.list_resource_tags(KeyId=key_id)
|
||||||
|
actual = result.get("Tags", [])
|
||||||
|
assert tags == actual
|
||||||
|
|
||||||
|
client.untag_resource(KeyId=key_id, TagKeys=["key1"])
|
||||||
|
|
||||||
|
actual = client.list_resource_tags(KeyId=key_id).get("Tags", [])
|
||||||
|
expected = [{"TagKey": "key2", "TagValue": "value2"}]
|
||||||
|
assert expected == actual
|
||||||
|
|
||||||
|
|
||||||
|
@mock_kms
|
||||||
|
def test_key_tagging_sad():
|
||||||
|
b = KmsBackend()
|
||||||
|
|
||||||
|
try:
|
||||||
|
b.tag_resource('unknown', [])
|
||||||
|
raise 'tag_resource should fail if KeyId is not known'
|
||||||
|
except JsonRESTError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
b.untag_resource('unknown', [])
|
||||||
|
raise 'untag_resource should fail if KeyId is not known'
|
||||||
|
except JsonRESTError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
b.list_resource_tags('unknown')
|
||||||
|
raise 'list_resource_tags should fail if KeyId is not known'
|
||||||
|
except JsonRESTError:
|
||||||
|
pass
|
||||||
|
@ -102,7 +102,7 @@ def test_deserialize_ciphertext_blob(raw, serialized):
|
|||||||
@parameterized(((ec[0],) for ec in ENCRYPTION_CONTEXT_VECTORS))
|
@parameterized(((ec[0],) for ec in ENCRYPTION_CONTEXT_VECTORS))
|
||||||
def test_encrypt_decrypt_cycle(encryption_context):
|
def test_encrypt_decrypt_cycle(encryption_context):
|
||||||
plaintext = b"some secret plaintext"
|
plaintext = b"some secret plaintext"
|
||||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
master_key = Key("nop", "nop", "nop", "nop", "nop")
|
||||||
master_key_map = {master_key.id: master_key}
|
master_key_map = {master_key.id: master_key}
|
||||||
|
|
||||||
ciphertext_blob = encrypt(
|
ciphertext_blob = encrypt(
|
||||||
@ -133,7 +133,7 @@ def test_encrypt_unknown_key_id():
|
|||||||
|
|
||||||
|
|
||||||
def test_decrypt_invalid_ciphertext_format():
|
def test_decrypt_invalid_ciphertext_format():
|
||||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
master_key = Key("nop", "nop", "nop", "nop", "nop")
|
||||||
master_key_map = {master_key.id: master_key}
|
master_key_map = {master_key.id: master_key}
|
||||||
|
|
||||||
with assert_raises(InvalidCiphertextException):
|
with assert_raises(InvalidCiphertextException):
|
||||||
@ -153,7 +153,7 @@ def test_decrypt_unknwown_key_id():
|
|||||||
|
|
||||||
|
|
||||||
def test_decrypt_invalid_ciphertext():
|
def test_decrypt_invalid_ciphertext():
|
||||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
master_key = Key("nop", "nop", "nop", "nop", "nop")
|
||||||
master_key_map = {master_key.id: master_key}
|
master_key_map = {master_key.id: master_key}
|
||||||
ciphertext_blob = (
|
ciphertext_blob = (
|
||||||
master_key.id.encode("utf-8") + b"123456789012"
|
master_key.id.encode("utf-8") + b"123456789012"
|
||||||
@ -171,7 +171,7 @@ def test_decrypt_invalid_ciphertext():
|
|||||||
|
|
||||||
def test_decrypt_invalid_encryption_context():
|
def test_decrypt_invalid_encryption_context():
|
||||||
plaintext = b"some secret plaintext"
|
plaintext = b"some secret plaintext"
|
||||||
master_key = Key("nop", "nop", "nop", "nop", [], "nop")
|
master_key = Key("nop", "nop", "nop", "nop", "nop")
|
||||||
master_key_map = {master_key.id: master_key}
|
master_key_map = {master_key.id: master_key}
|
||||||
|
|
||||||
ciphertext_blob = encrypt(
|
ciphertext_blob = encrypt(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user