add tagging support to events

This commit is contained in:
Brady 2020-01-16 21:00:24 -05:00
parent d596560971
commit eaa8c8db6e
6 changed files with 228 additions and 12 deletions

View File

@ -6,6 +6,7 @@ from boto3 import Session
from moto.core.exceptions import JsonRESTError
from moto.core import BaseBackend, BaseModel
from moto.sts.models import ACCOUNT_ID
from moto.utilities.tagging_service import TaggingService
class Rule(BaseModel):
@ -104,6 +105,7 @@ class EventsBackend(BaseBackend):
self.region_name = region_name
self.event_buses = {}
self.event_sources = {}
self.tagger = TaggingService()
self._add_default_event_bus()
@ -361,6 +363,32 @@ class EventsBackend(BaseBackend):
self.event_buses.pop(name, None)
def list_tags_for_resource(self, arn):
name = arn.split("/")[-1]
if name in self.rules:
return self.tagger.list_tags_for_resource(self.rules[name].arn)
raise JsonRESTError(
"ResourceNotFoundException", "An entity that you specified does not exist."
)
def tag_resource(self, arn, tags):
name = arn.split("/")[-1]
if name in self.rules:
self.tagger.tag_resource(self.rules[name].arn, tags)
return {}
raise JsonRESTError(
"ResourceNotFoundException", "An entity that you specified does not exist."
)
def untag_resource(self, arn, tag_names):
name = arn.split("/")[-1]
if name in self.rules:
self.tagger.untag_resource_using_names(self.rules[name].arn, tag_names)
return {}
raise JsonRESTError(
"ResourceNotFoundException", "An entity that you specified does not exist."
)
events_backends = {}
for region in Session().get_available_regions("events"):

View File

@ -297,3 +297,26 @@ class EventsHandler(BaseResponse):
self.events_backend.delete_event_bus(name)
return "", self.response_headers
def list_tags_for_resource(self):
arn = self._get_param("ResourceARN")
result = self.events_backend.list_tags_for_resource(arn)
return json.dumps(result), self.response_headers
def tag_resource(self):
arn = self._get_param("ResourceARN")
tags = self._get_param("Tags")
result = self.events_backend.tag_resource(arn, tags)
return json.dumps(result), self.response_headers
def untag_resource(self):
arn = self._get_param("ResourceARN")
tags = self._get_param("TagKeys")
result = self.events_backend.untag_resource(arn, tags)
return json.dumps(result), self.response_headers

View File

View File

@ -0,0 +1,56 @@
class TaggingService:
def __init__(self, tagName="Tags", keyName="Key", valueName="Value"):
self.tagName = tagName
self.keyName = keyName
self.valueName = valueName
self.tags = {}
def list_tags_for_resource(self, arn):
result = []
if arn in self.tags:
for k, v in self.tags[arn].items():
result.append({self.keyName: k, self.valueName: v})
return {self.tagName: result}
def tag_resource(self, arn, tags):
if arn not in self.tags:
self.tags[arn] = {}
for t in tags:
if self.valueName in t:
self.tags[arn][t[self.keyName]] = t[self.valueName]
else:
self.tags[arn][t[self.keyName]] = None
def untag_resource_using_names(self, arn, tag_names):
for name in tag_names:
if name in self.tags.get(arn, {}):
del self.tags[arn][name]
def untag_resource_using_tags(self, arn, tags):
m = self.tags.get(arn, {})
for t in tags:
if self.keyName in t:
if t[self.keyName] in m:
if self.valueName in t:
if m[t[self.keyName]] != t[self.valueName]:
continue
# If both key and value are provided, match both before deletion
del m[t[self.keyName]]
def extract_tag_names(self, tags):
results = []
if len(tags) == 0:
return results
for tag in tags:
if self.keyName in tag:
results.append(tag[self.keyName])
return results
def flatten_tag_list(self, tags):
result = {}
for t in tags:
if self.valueName in t:
result[t[self.keyName]] = t[self.valueName]
else:
result[t[self.keyName]] = None
return result

View File

@ -1,12 +1,15 @@
import random
import boto3
import json
import sure # noqa
import random
import unittest
from moto.events import mock_events
import boto3
from botocore.exceptions import ClientError
from nose.tools import assert_raises
from moto.core import ACCOUNT_ID
from moto.core.exceptions import JsonRESTError
from moto.events import mock_events
from moto.events.models import EventsBackend
RULES = [
{"Name": "test1", "ScheduleExpression": "rate(5 minutes)"},
@ -136,14 +139,6 @@ def test_list_rule_names_by_target():
assert rule in test_2_target["Rules"]
@mock_events
def test_list_rules():
client = generate_environment()
rules = client.list_rules()
assert len(rules["Rules"]) == len(RULES)
@mock_events
def test_delete_rule():
client = generate_environment()
@ -461,3 +456,58 @@ def test_delete_event_bus_errors():
client.delete_event_bus.when.called_with(Name="default").should.throw(
ClientError, "Cannot delete event bus default."
)
@mock_events
def test_rule_tagging_happy():
client = generate_environment()
rule_name = get_random_rule()["Name"]
rule_arn = client.describe_rule(Name=rule_name).get("Arn")
tags = [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
client.tag_resource(ResourceARN=rule_arn, Tags=tags)
actual = client.list_tags_for_resource(ResourceARN=rule_arn).get("Tags")
tc = unittest.TestCase("__init__")
expected = [{"Value": "value1", "Key": "key1"}, {"Value": "value2", "Key": "key2"}]
tc.assertTrue(
(expected[0] == actual[0] and expected[1] == actual[1])
or (expected[1] == actual[0] and expected[0] == actual[1])
)
client.untag_resource(ResourceARN=rule_arn, TagKeys=["key1"])
actual = client.list_tags_for_resource(ResourceARN=rule_arn).get("Tags")
expected = [{"Key": "key2", "Value": "value2"}]
assert expected == actual
def freeze_dict(obj):
if isinstance(obj, dict):
dict_items = list(obj.items())
dict_items.append(("__frozen__", True))
return tuple([(k, freeze_dict(v)) for k, v in dict_items])
return obj
@mock_events
def test_rule_tagging_sad():
b = EventsBackend("us-west-2")
try:
b.tag_resource("unknown", [])
raise "tag_resource should fail if ResourceARN is not known"
except JsonRESTError:
pass
try:
b.untag_resource("unknown", [])
raise "untag_resource should fail if ResourceARN is not known"
except JsonRESTError:
pass
try:
b.list_tags_for_resource("unknown")
raise "list_tags_for_resource should fail if ResourceARN is not known"
except JsonRESTError:
pass

View File

@ -0,0 +1,59 @@
import unittest
from moto.utilities.tagging_service import TaggingService
class TestTaggingService(unittest.TestCase):
def test_list_empty(self):
svc = TaggingService()
result = svc.list_tags_for_resource("test")
self.assertEqual(result, {"Tags": []})
def test_create_tag(self):
svc = TaggingService("TheTags", "TagKey", "TagValue")
tags = [{"TagKey": "key_key", "TagValue": "value_value"}]
svc.tag_resource("arn", tags)
actual = svc.list_tags_for_resource("arn")
expected = {"TheTags": [{"TagKey": "key_key", "TagValue": "value_value"}]}
self.assertDictEqual(expected, actual)
def test_create_tag_without_value(self):
svc = TaggingService()
tags = [{"Key": "key_key"}]
svc.tag_resource("arn", tags)
actual = svc.list_tags_for_resource("arn")
expected = {"Tags": [{"Key": "key_key", "Value": None}]}
self.assertDictEqual(expected, actual)
def test_delete_tag_using_names(self):
svc = TaggingService()
tags = [{"Key": "key_key", "Value": "value_value"}]
svc.tag_resource("arn", tags)
svc.untag_resource_using_names("arn", ["key_key"])
result = svc.list_tags_for_resource("arn")
self.assertEqual(result, {"Tags": []})
def test_list_empty_delete(self):
svc = TaggingService()
svc.untag_resource_using_names("arn", ["key_key"])
result = svc.list_tags_for_resource("arn")
self.assertEqual(result, {"Tags": []})
def test_delete_tag_using_tags(self):
svc = TaggingService()
tags = [{"Key": "key_key", "Value": "value_value"}]
svc.tag_resource("arn", tags)
svc.untag_resource_using_tags("arn", tags)
result = svc.list_tags_for_resource("arn")
self.assertEqual(result, {"Tags": []})
def test_extract_tag_names(self):
svc = TaggingService()
tags = [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
actual = svc.extract_tag_names(tags)
expected = ["key1", "key2"]
self.assertEqual(expected, actual)
if __name__ == "__main__":
unittest.main()