adds tagging support for cloudwatch events service
This commit is contained in:
parent
d596560971
commit
6cb0428d20
@ -6,6 +6,7 @@ from boto3 import Session
|
|||||||
from moto.core.exceptions import JsonRESTError
|
from moto.core.exceptions import JsonRESTError
|
||||||
from moto.core import BaseBackend, BaseModel
|
from moto.core import BaseBackend, BaseModel
|
||||||
from moto.sts.models import ACCOUNT_ID
|
from moto.sts.models import ACCOUNT_ID
|
||||||
|
from moto.utilities.tagging_service import TaggingService
|
||||||
|
|
||||||
|
|
||||||
class Rule(BaseModel):
|
class Rule(BaseModel):
|
||||||
@ -104,6 +105,7 @@ class EventsBackend(BaseBackend):
|
|||||||
self.region_name = region_name
|
self.region_name = region_name
|
||||||
self.event_buses = {}
|
self.event_buses = {}
|
||||||
self.event_sources = {}
|
self.event_sources = {}
|
||||||
|
self.tagger = TaggingService()
|
||||||
|
|
||||||
self._add_default_event_bus()
|
self._add_default_event_bus()
|
||||||
|
|
||||||
@ -361,6 +363,31 @@ class EventsBackend(BaseBackend):
|
|||||||
|
|
||||||
self.event_buses.pop(name, None)
|
self.event_buses.pop(name, None)
|
||||||
|
|
||||||
|
def list_tags_for_resource(self, arn):
|
||||||
|
name = arn.split('/')[-1]
|
||||||
|
if name in self.rules:
|
||||||
|
return self.tagger.list_tags_for_resource(self.rules[name].arn)
|
||||||
|
raise JsonRESTError(
|
||||||
|
"ResourceNotFoundException", "An entity that you specified does not exist."
|
||||||
|
)
|
||||||
|
|
||||||
|
def tag_resource(self, arn, tags):
|
||||||
|
name = arn.split('/')[-1]
|
||||||
|
if name in self.rules:
|
||||||
|
self.tagger.tag_resource(self.rules[name].arn, tags)
|
||||||
|
return {}
|
||||||
|
raise JsonRESTError(
|
||||||
|
"ResourceNotFoundException", "An entity that you specified does not exist."
|
||||||
|
)
|
||||||
|
|
||||||
|
def untag_resource(self, arn, tag_names):
|
||||||
|
name = arn.split('/')[-1]
|
||||||
|
if name in self.rules:
|
||||||
|
self.tagger.untag_resource_using_names(self.rules[name].arn, tag_names)
|
||||||
|
return {}
|
||||||
|
raise JsonRESTError(
|
||||||
|
"ResourceNotFoundException", "An entity that you specified does not exist."
|
||||||
|
)
|
||||||
|
|
||||||
events_backends = {}
|
events_backends = {}
|
||||||
for region in Session().get_available_regions("events"):
|
for region in Session().get_available_regions("events"):
|
||||||
|
@ -297,3 +297,26 @@ class EventsHandler(BaseResponse):
|
|||||||
self.events_backend.delete_event_bus(name)
|
self.events_backend.delete_event_bus(name)
|
||||||
|
|
||||||
return "", self.response_headers
|
return "", self.response_headers
|
||||||
|
|
||||||
|
def list_tags_for_resource(self):
|
||||||
|
arn = self._get_param("ResourceARN")
|
||||||
|
|
||||||
|
result = self.events_backend.list_tags_for_resource(arn)
|
||||||
|
|
||||||
|
return json.dumps(result), self.response_headers
|
||||||
|
|
||||||
|
def tag_resource(self):
|
||||||
|
arn = self._get_param("ResourceARN")
|
||||||
|
tags = self._get_param("Tags")
|
||||||
|
|
||||||
|
result = self.events_backend.tag_resource(arn, tags)
|
||||||
|
|
||||||
|
return json.dumps(result), self.response_headers
|
||||||
|
|
||||||
|
def untag_resource(self):
|
||||||
|
arn = self._get_param("ResourceARN")
|
||||||
|
tags = self._get_param("TagKeys")
|
||||||
|
|
||||||
|
result = self.events_backend.untag_resource(arn, tags)
|
||||||
|
|
||||||
|
return json.dumps(result), self.response_headers
|
||||||
|
56
moto/utilities/tagging_service.py
Normal file
56
moto/utilities/tagging_service.py
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
class TaggingService:
|
||||||
|
def __init__(self, tagName='Tags', keyName='Key', valueName='Value'):
|
||||||
|
self.tagName = tagName
|
||||||
|
self.keyName = keyName
|
||||||
|
self.valueName = valueName
|
||||||
|
self.tags = {}
|
||||||
|
|
||||||
|
def list_tags_for_resource(self, arn):
|
||||||
|
result = []
|
||||||
|
if arn in self.tags:
|
||||||
|
for k, v in self.tags[arn].items():
|
||||||
|
result.append({self.keyName: k, self.valueName: v})
|
||||||
|
return {self.tagName: result}
|
||||||
|
|
||||||
|
def tag_resource(self, arn, tags):
|
||||||
|
if arn not in self.tags:
|
||||||
|
self.tags[arn] = {}
|
||||||
|
for t in tags:
|
||||||
|
if self.valueName in t:
|
||||||
|
self.tags[arn][t[self.keyName]] = t[self.valueName]
|
||||||
|
else:
|
||||||
|
self.tags[arn][t[self.keyName]] = None
|
||||||
|
|
||||||
|
def untag_resource_using_names(self, arn, tag_names):
|
||||||
|
for name in tag_names:
|
||||||
|
if name in self.tags.get(arn, {}):
|
||||||
|
del self.tags[arn][name]
|
||||||
|
|
||||||
|
def untag_resource_using_tags(self, arn, tags):
|
||||||
|
m = self.tags.get(arn, {})
|
||||||
|
for t in tags:
|
||||||
|
if self.keyName in t:
|
||||||
|
if t[self.keyName] in m:
|
||||||
|
if self.valueName in t:
|
||||||
|
if m[t[self.keyName]] != t[self.valueName]:
|
||||||
|
continue
|
||||||
|
# If both key and value are provided, match both before deletion
|
||||||
|
del m[t[self.keyName]]
|
||||||
|
|
||||||
|
def extract_tag_names(self, tags):
|
||||||
|
results = []
|
||||||
|
if len(tags) == 0:
|
||||||
|
return results
|
||||||
|
for tag in tags:
|
||||||
|
if self.keyName in tag:
|
||||||
|
results.append(tag[self.keyName])
|
||||||
|
return results
|
||||||
|
|
||||||
|
def flatten_tag_list(self, tags):
|
||||||
|
result = {}
|
||||||
|
for t in tags:
|
||||||
|
if self.valueName in t:
|
||||||
|
result[t[self.keyName]] = t[self.valueName]
|
||||||
|
else:
|
||||||
|
result[t[self.keyName]] = None
|
||||||
|
return result
|
@ -5,8 +5,10 @@ import sure # noqa
|
|||||||
|
|
||||||
from moto.events import mock_events
|
from moto.events import mock_events
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError
|
||||||
|
from moto.core.exceptions import JsonRESTError
|
||||||
from nose.tools import assert_raises
|
from nose.tools import assert_raises
|
||||||
from moto.core import ACCOUNT_ID
|
from moto.core import ACCOUNT_ID
|
||||||
|
from moto.events.models import EventsBackend
|
||||||
|
|
||||||
RULES = [
|
RULES = [
|
||||||
{"Name": "test1", "ScheduleExpression": "rate(5 minutes)"},
|
{"Name": "test1", "ScheduleExpression": "rate(5 minutes)"},
|
||||||
@ -136,14 +138,6 @@ def test_list_rule_names_by_target():
|
|||||||
assert rule in test_2_target["Rules"]
|
assert rule in test_2_target["Rules"]
|
||||||
|
|
||||||
|
|
||||||
@mock_events
|
|
||||||
def test_list_rules():
|
|
||||||
client = generate_environment()
|
|
||||||
|
|
||||||
rules = client.list_rules()
|
|
||||||
assert len(rules["Rules"]) == len(RULES)
|
|
||||||
|
|
||||||
|
|
||||||
@mock_events
|
@mock_events
|
||||||
def test_delete_rule():
|
def test_delete_rule():
|
||||||
client = generate_environment()
|
client = generate_environment()
|
||||||
@ -461,3 +455,43 @@ def test_delete_event_bus_errors():
|
|||||||
client.delete_event_bus.when.called_with(Name="default").should.throw(
|
client.delete_event_bus.when.called_with(Name="default").should.throw(
|
||||||
ClientError, "Cannot delete event bus default."
|
ClientError, "Cannot delete event bus default."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@mock_events
|
||||||
|
def test_rule_tagging_happy():
|
||||||
|
client = generate_environment()
|
||||||
|
rule_name = get_random_rule()["Name"]
|
||||||
|
rule_arn = client.describe_rule(Name=rule_name).get("Arn")
|
||||||
|
|
||||||
|
tags = [{"Key": "key1", "Value": "value1"}, {"Key": "key2", "Value": "value2"}]
|
||||||
|
client.tag_resource(ResourceARN=rule_arn, Tags=tags)
|
||||||
|
|
||||||
|
actual = client.list_tags_for_resource(ResourceARN=rule_arn).get("Tags")
|
||||||
|
assert tags == actual
|
||||||
|
|
||||||
|
client.untag_resource(ResourceARN=rule_arn, TagKeys=["key1"])
|
||||||
|
|
||||||
|
actual = client.list_tags_for_resource(ResourceARN=rule_arn).get("Tags")
|
||||||
|
expected = [{"Key": "key2", "Value": "value2"}]
|
||||||
|
assert expected == actual
|
||||||
|
|
||||||
|
@mock_events
|
||||||
|
def test_rule_tagging_sad():
|
||||||
|
b = EventsBackend("us-west-2")
|
||||||
|
|
||||||
|
try:
|
||||||
|
b.tag_resource('unknown', [])
|
||||||
|
raise 'tag_resource should fail if ResourceARN is not known'
|
||||||
|
except JsonRESTError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
b.untag_resource('unknown', [])
|
||||||
|
raise 'untag_resource should fail if ResourceARN is not known'
|
||||||
|
except JsonRESTError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
b.list_tags_for_resource('unknown')
|
||||||
|
raise 'list_tags_for_resource should fail if ResourceARN is not known'
|
||||||
|
except JsonRESTError:
|
||||||
|
pass
|
53
tests/test_utilities/test_tagging_service.py
Normal file
53
tests/test_utilities/test_tagging_service.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
from moto.utilities.tagging_service import TaggingService
|
||||||
|
|
||||||
|
|
||||||
|
class TestTaggingService(unittest.TestCase):
|
||||||
|
def test_list_empty(self):
|
||||||
|
svc = TaggingService()
|
||||||
|
result = svc.list_tags_for_resource('test')
|
||||||
|
self.assertEqual(result, {'Tags': []})
|
||||||
|
|
||||||
|
def test_create_tag(self):
|
||||||
|
svc = TaggingService('TheTags', 'TagKey', 'TagValue')
|
||||||
|
tags = [{'TagKey': 'key_key', 'TagValue': 'value_value'}]
|
||||||
|
svc.tag_resource('arn', tags)
|
||||||
|
actual = svc.list_tags_for_resource('arn')
|
||||||
|
expected = {'TheTags': [{'TagKey': 'key_key', 'TagValue': 'value_value'}]}
|
||||||
|
self.assertDictEqual(expected, actual)
|
||||||
|
|
||||||
|
def test_create_tag_without_value(self):
|
||||||
|
svc = TaggingService()
|
||||||
|
tags = [{'Key': 'key_key'}]
|
||||||
|
svc.tag_resource('arn', tags)
|
||||||
|
actual = svc.list_tags_for_resource('arn')
|
||||||
|
expected = {'Tags': [{'Key': 'key_key', 'Value': ''}]}
|
||||||
|
self.assertDictEqual(expected, actual)
|
||||||
|
|
||||||
|
def test_delete_tag(self):
|
||||||
|
svc = TaggingService()
|
||||||
|
tags = [{'Key': 'key_key', 'Value': 'value_value'}]
|
||||||
|
svc.tag_resource('arn', tags)
|
||||||
|
svc.untag_resource('arn', ['key_key'])
|
||||||
|
result = svc.list_tags_for_resource('arn')
|
||||||
|
self.assertEqual(
|
||||||
|
result, {'Tags': []})
|
||||||
|
|
||||||
|
def test_list_empty_delete(self):
|
||||||
|
svc = TaggingService()
|
||||||
|
svc.untag_resource('arn', ['key_key'])
|
||||||
|
result = svc.list_tags_for_resource('arn')
|
||||||
|
self.assertEqual(
|
||||||
|
result, {'Tags': []})
|
||||||
|
|
||||||
|
def test_extract_tag_names(self):
|
||||||
|
svc = TaggingService()
|
||||||
|
tags = [{'Key': 'key1', 'Value': 'value1'}, {'Key': 'key2', 'Value': 'value2'}]
|
||||||
|
actual = svc.extract_tag_names(tags)
|
||||||
|
expected = ['key1', 'key2']
|
||||||
|
self.assertEqual(expected, actual)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user