This commit is contained in:
stromp 2021-09-22 18:35:50 +02:00 committed by GitHub
parent f7d490167b
commit 097a260dce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 18 deletions

View File

@ -346,7 +346,8 @@ class KmsBackend(BaseBackend):
return plaintext, ciphertext_blob, arn return plaintext, ciphertext_blob, arn
def list_resource_tags(self, key_id): def list_resource_tags(self, key_id_or_arn):
key_id = self.get_key_id(key_id_or_arn)
if key_id in self.keys: if key_id in self.keys:
return self.tagger.list_tags_for_resource(key_id) return self.tagger.list_tags_for_resource(key_id)
raise JsonRESTError( raise JsonRESTError(
@ -354,7 +355,8 @@ class KmsBackend(BaseBackend):
"The request was rejected because the specified entity or resource could not be found.", "The request was rejected because the specified entity or resource could not be found.",
) )
def tag_resource(self, key_id, tags): def tag_resource(self, key_id_or_arn, tags):
key_id = self.get_key_id(key_id_or_arn)
if key_id in self.keys: if key_id in self.keys:
self.tagger.tag_resource(key_id, tags) self.tagger.tag_resource(key_id, tags)
return {} return {}
@ -363,7 +365,8 @@ class KmsBackend(BaseBackend):
"The request was rejected because the specified entity or resource could not be found.", "The request was rejected because the specified entity or resource could not be found.",
) )
def untag_resource(self, key_id, tag_names): def untag_resource(self, key_id_or_arn, tag_names):
key_id = self.get_key_id(key_id_or_arn)
if key_id in self.keys: if key_id in self.keys:
self.tagger.untag_resource_using_names(key_id, tag_names) self.tagger.untag_resource_using_names(key_id, tag_names)
return {} return {}

View File

@ -671,6 +671,18 @@ def test__assert_default_policy():
sort = lambda l: sorted(l, key=lambda d: d.keys()) sort = lambda l: sorted(l, key=lambda d: d.keys())
def _check_tags(key_id, created_tags, client):
result = client.list_resource_tags(KeyId=key_id)
actual = result.get("Tags", [])
assert sort(created_tags) == sort(actual)
client.untag_resource(KeyId=key_id, TagKeys=["key1"])
actual = client.list_resource_tags(KeyId=key_id).get("Tags", [])
expected = [{"TagKey": "key2", "TagValue": "value2"}]
assert sort(expected) == sort(actual)
@mock_kms @mock_kms
def test_key_tag_on_create_key_happy(): def test_key_tag_on_create_key_happy():
client = boto3.client("kms", region_name="us-east-1") client = boto3.client("kms", region_name="us-east-1")
@ -680,17 +692,19 @@ def test_key_tag_on_create_key_happy():
{"TagKey": "key2", "TagValue": "value2"}, {"TagKey": "key2", "TagValue": "value2"},
] ]
key = client.create_key(Description="test-key-tagging", Tags=tags) key = client.create_key(Description="test-key-tagging", Tags=tags)
key_id = key["KeyMetadata"]["KeyId"] _check_tags(key["KeyMetadata"]["KeyId"], tags, client)
result = client.list_resource_tags(KeyId=key_id)
actual = result.get("Tags", [])
assert sort(tags) == sort(actual)
client.untag_resource(KeyId=key_id, TagKeys=["key1"]) @mock_kms
def test_key_tag_on_create_key_on_arn_happy():
client = boto3.client("kms", region_name="us-east-1")
actual = client.list_resource_tags(KeyId=key_id).get("Tags", []) tags = [
expected = [{"TagKey": "key2", "TagValue": "value2"}] {"TagKey": "key1", "TagValue": "value1"},
assert sort(expected) == sort(actual) {"TagKey": "key2", "TagValue": "value2"},
]
key = client.create_key(Description="test-key-tagging", Tags=tags)
_check_tags(key["KeyMetadata"]["Arn"], tags, client)
@mock_kms @mock_kms
@ -704,16 +718,21 @@ def test_key_tag_added_happy():
{"TagKey": "key2", "TagValue": "value2"}, {"TagKey": "key2", "TagValue": "value2"},
] ]
client.tag_resource(KeyId=key_id, Tags=tags) client.tag_resource(KeyId=key_id, Tags=tags)
_check_tags(key_id, tags, client)
result = client.list_resource_tags(KeyId=key_id)
actual = result.get("Tags", [])
assert sort(tags) == sort(actual)
client.untag_resource(KeyId=key_id, TagKeys=["key1"]) @mock_kms
def test_key_tag_added_arn_based_happy():
client = boto3.client("kms", region_name="us-east-1")
actual = client.list_resource_tags(KeyId=key_id).get("Tags", []) key = client.create_key(Description="test-key-tagging")
expected = [{"TagKey": "key2", "TagValue": "value2"}] key_id = key["KeyMetadata"]["Arn"]
assert sort(expected) == sort(actual) tags = [
{"TagKey": "key1", "TagValue": "value1"},
{"TagKey": "key2", "TagValue": "value2"},
]
client.tag_resource(KeyId=key_id, Tags=tags)
_check_tags(key_id, tags, client)
@mock_kms_deprecated @mock_kms_deprecated

View File

@ -398,6 +398,20 @@ def test_list_resource_tags():
assert response["Tags"][0]["TagValue"] == "string" assert response["Tags"][0]["TagValue"] == "string"
@mock_kms
def test_list_resource_tags_with_arn():
client = boto3.client("kms", region_name="us-east-1")
key = client.create_key(Description="cancel-key-deletion")
client.schedule_key_deletion(KeyId=key["KeyMetadata"]["KeyId"])
keyid = key["KeyMetadata"]["Arn"]
client.tag_resource(KeyId=keyid, Tags=[{"TagKey": "string", "TagValue": "string"}])
response = client.list_resource_tags(KeyId=keyid)
assert response["Tags"][0]["TagKey"] == "string"
assert response["Tags"][0]["TagValue"] == "string"
@pytest.mark.parametrize( @pytest.mark.parametrize(
"kwargs,expected_key_length", "kwargs,expected_key_length",
( (