fix test cases, bug when no tags are present and conflict

This commit is contained in:
Brady 2020-02-19 09:12:13 -05:00
parent 38413577fc
commit 1221d2653a
6 changed files with 61 additions and 50 deletions

View File

@ -367,7 +367,7 @@ class EventsBackend(BaseBackend):
self.event_buses.pop(name, None) self.event_buses.pop(name, None)
def list_tags_for_resource(self, arn): def list_tags_for_resource(self, arn):
name = arn.split('/')[-1] name = arn.split("/")[-1]
if name in self.rules: if name in self.rules:
return self.tagger.list_tags_for_resource(self.rules[name].arn) return self.tagger.list_tags_for_resource(self.rules[name].arn)
raise JsonRESTError( raise JsonRESTError(

View File

@ -111,7 +111,7 @@ class Key(BaseModel):
key_usage="ENCRYPT_DECRYPT", key_usage="ENCRYPT_DECRYPT",
customer_master_key_spec="SYMMETRIC_DEFAULT", customer_master_key_spec="SYMMETRIC_DEFAULT",
description=properties["Description"], description=properties["Description"],
tags=properties["Tags"], tags=properties.get("Tags", []),
region=region_name, region=region_name,
) )
key.key_rotation_status = properties["EnableKeyRotation"] key.key_rotation_status = properties["EnableKeyRotation"]
@ -131,14 +131,12 @@ class KmsBackend(BaseBackend):
def __init__(self): def __init__(self):
self.keys = {} self.keys = {}
self.key_to_aliases = defaultdict(set) self.key_to_aliases = defaultdict(set)
self.tagger = TaggingService(keyName='TagKey', valueName='TagValue') self.tagger = TaggingService(keyName="TagKey", valueName="TagValue")
def create_key( def create_key(
self, policy, key_usage, customer_master_key_spec, description, tags, region self, policy, key_usage, customer_master_key_spec, description, tags, region
): ):
key = Key( key = Key(policy, key_usage, customer_master_key_spec, description, region)
policy, key_usage, customer_master_key_spec, description, region
)
self.keys[key.id] = key self.keys[key.id] = key
if tags is not None and len(tags) > 0: if tags is not None and len(tags) > 0:
self.tag_resource(key.id, tags) self.tag_resource(key.id, tags)
@ -326,7 +324,8 @@ class KmsBackend(BaseBackend):
if key_id in self.keys: if key_id in self.keys:
return self.tagger.list_tags_for_resource(key_id) return self.tagger.list_tags_for_resource(key_id)
raise JsonRESTError( raise JsonRESTError(
"NotFoundException", "The request was rejected because the specified entity or resource could not be found." "NotFoundException",
"The request was rejected because the specified entity or resource could not be found.",
) )
def tag_resource(self, key_id, tags): def tag_resource(self, key_id, tags):
@ -334,7 +333,8 @@ class KmsBackend(BaseBackend):
self.tagger.tag_resource(key_id, tags) self.tagger.tag_resource(key_id, tags)
return {} return {}
raise JsonRESTError( raise JsonRESTError(
"NotFoundException", "The request was rejected because the specified entity or resource could not be found." "NotFoundException",
"The request was rejected because the specified entity or resource could not be found.",
) )
def untag_resource(self, key_id, tag_names): def untag_resource(self, key_id, tag_names):
@ -342,7 +342,8 @@ class KmsBackend(BaseBackend):
self.tagger.untag_resource_using_names(key_id, tag_names) self.tagger.untag_resource_using_names(key_id, tag_names)
return {} return {}
raise JsonRESTError( raise JsonRESTError(
"NotFoundException", "The request was rejected because the specified entity or resource could not be found." "NotFoundException",
"The request was rejected because the specified entity or resource could not be found.",
) )

View File

@ -318,7 +318,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
# KMS # KMS
def get_kms_tags(kms_key_id): def get_kms_tags(kms_key_id):
result = [] result = []
for tag in self.kms_backend.list_resource_tags(kms_key_id).get("Tags",[]): for tag in self.kms_backend.list_resource_tags(kms_key_id).get("Tags", []):
result.append({"Key": tag["TagKey"], "Value": tag["TagValue"]}) result.append({"Key": tag["TagKey"], "Value": tag["TagValue"]})
return result return result

View File

@ -1,7 +1,7 @@
[nosetests] [nosetests]
verbosity=1 verbosity=1
detailed-errors=1 detailed-errors=1
with-coverage=1 #with-coverage=1
cover-package=moto cover-package=moto
[bdist_wheel] [bdist_wheel]

View File

@ -2,8 +2,10 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import base64 import base64
import re import re
from collections import OrderedDict
import boto.kms import boto.kms
import boto3
import six import six
import sure # noqa import sure # noqa
from boto.exception import JSONResponseError from boto.exception import JSONResponseError
@ -13,7 +15,7 @@ from parameterized import parameterized
from moto.core.exceptions import JsonRESTError from moto.core.exceptions import JsonRESTError
from moto.kms.models import KmsBackend from moto.kms.models import KmsBackend
from moto.kms.exceptions import NotFoundException as MotoNotFoundException from moto.kms.exceptions import NotFoundException as MotoNotFoundException
from moto import mock_kms_deprecated from moto import mock_kms_deprecated, mock_kms
PLAINTEXT_VECTORS = ( PLAINTEXT_VECTORS = (
(b"some encodeable plaintext",), (b"some encodeable plaintext",),
@ -682,24 +684,55 @@ def test__assert_default_policy():
) )
@mock_kms_deprecated if six.PY2:
def test_key_tagging_happy(): sort = sorted
client = boto3.client("kms", region_name="us-east-1") else:
key = client.create_key(Description="test-key-tagging") sort = lambda l: sorted(l, key=lambda d: d.keys())
key_id = key["KeyMetadata"]["KeyId"]
tags = [{"TagKey": "key1", "TagValue": "value1"}, {"TagKey": "key2", "TagValue": "value2"}]
client.tag_resource(KeyId=key_id, Tags=tags) @mock_kms
def test_key_tag_on_create_key_happy():
client = boto3.client("kms", region_name="us-east-1")
tags = [
{"TagKey": "key1", "TagValue": "value1"},
{"TagKey": "key2", "TagValue": "value2"},
]
key = client.create_key(Description="test-key-tagging", Tags=tags)
key_id = key["KeyMetadata"]["KeyId"]
result = client.list_resource_tags(KeyId=key_id) result = client.list_resource_tags(KeyId=key_id)
actual = result.get("Tags", []) actual = result.get("Tags", [])
assert tags == actual assert sort(tags) == sort(actual)
client.untag_resource(KeyId=key_id, TagKeys=["key1"]) client.untag_resource(KeyId=key_id, TagKeys=["key1"])
actual = client.list_resource_tags(KeyId=key_id).get("Tags", []) actual = client.list_resource_tags(KeyId=key_id).get("Tags", [])
expected = [{"TagKey": "key2", "TagValue": "value2"}] expected = [{"TagKey": "key2", "TagValue": "value2"}]
assert expected == actual assert sort(expected) == sort(actual)
@mock_kms
def test_key_tag_added_happy():
client = boto3.client("kms", region_name="us-east-1")
key = client.create_key(Description="test-key-tagging")
key_id = key["KeyMetadata"]["KeyId"]
tags = [
{"TagKey": "key1", "TagValue": "value1"},
{"TagKey": "key2", "TagValue": "value2"},
]
client.tag_resource(KeyId=key_id, Tags=tags)
result = client.list_resource_tags(KeyId=key_id)
actual = result.get("Tags", [])
assert sort(tags) == sort(actual)
client.untag_resource(KeyId=key_id, TagKeys=["key1"])
actual = client.list_resource_tags(KeyId=key_id).get("Tags", [])
expected = [{"TagKey": "key2", "TagValue": "value2"}]
assert sort(expected) == sort(actual)
@mock_kms_deprecated @mock_kms_deprecated
@ -707,19 +740,19 @@ def test_key_tagging_sad():
b = KmsBackend() b = KmsBackend()
try: try:
b.tag_resource('unknown', []) b.tag_resource("unknown", [])
raise 'tag_resource should fail if KeyId is not known' raise "tag_resource should fail if KeyId is not known"
except JsonRESTError: except JsonRESTError:
pass pass
try: try:
b.untag_resource('unknown', []) b.untag_resource("unknown", [])
raise 'untag_resource should fail if KeyId is not known' raise "untag_resource should fail if KeyId is not known"
except JsonRESTError: except JsonRESTError:
pass pass
try: try:
b.list_resource_tags('unknown') b.list_resource_tags("unknown")
raise 'list_resource_tags should fail if KeyId is not known' raise "list_resource_tags should fail if KeyId is not known"
except JsonRESTError: except JsonRESTError:
pass pass

View File

@ -248,26 +248,3 @@ def test_get_many_resources():
) )
# TODO test pagenation # TODO test pagenation
@mock_kms
def test_get_kms_tags():
kms = boto3.client("kms", region_name="us-east-1")
key = kms.create_key(
KeyUsage="ENCRYPT_DECRYPT",
Tags=[
{"TagKey": "key_name", "TagValue": "a_value"},
{"TagKey": "key_2", "TagValue": "val2"},
],
)
key_id = key["KeyMetadata"]["KeyId"]
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-east-1")
resp = rtapi.get_resources(
ResourceTypeFilters=["kms"],
TagFilters=[{"Key": "key_name"}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["Tags"].should.contain(
{"Key": "key_name", "Value": "a_value"}
)