feat: implement logs list_tags_log_group

This commit is contained in:
Wayne Metcalfe 2019-09-26 16:20:28 +01:00
parent 9cc6a1533f
commit 3cb7c3e568
4 changed files with 36 additions and 1 deletions

View File

@ -4080,7 +4080,7 @@
- [ ] get_log_group_fields
- [ ] get_log_record
- [ ] get_query_results
- [ ] list_tags_log_group
- [X] list_tags_log_group
- [ ] put_destination
- [ ] put_destination_policy
- [X] put_log_events

View File

@ -231,6 +231,9 @@ class LogGroup:
def set_retention_policy(self, retention_in_days):
self.retentionInDays = retention_in_days
def list_tags(self):
return self.tags if self.tags else {}
class LogsBackend(BaseBackend):
def __init__(self, region_name):
@ -322,5 +325,11 @@ class LogsBackend(BaseBackend):
log_group = self.groups[log_group_name]
return log_group.set_retention_policy(None)
def list_tags_log_group(self, log_group_name):
if log_group_name not in self.groups:
raise ResourceNotFoundException()
log_group = self.groups[log_group_name]
return log_group.list_tags()
logs_backends = {region.name: LogsBackend(region.name) for region in boto.logs.regions()}

View File

@ -134,3 +134,11 @@ class LogsResponse(BaseResponse):
log_group_name = self._get_param('logGroupName')
self.logs_backend.delete_retention_policy(log_group_name)
return ''
def list_tags_log_group(self):
log_group_name = self._get_param('logGroupName')
tags = self.logs_backend.list_tags_log_group(log_group_name)
return json.dumps({
'tags': tags
})

View File

@ -225,3 +225,21 @@ def test_get_log_events():
for i in range(10):
resp['events'][i]['timestamp'].should.equal(i)
resp['events'][i]['message'].should.equal(str(i))
@mock_logs
def test_list_tags_log_group():
conn = boto3.client('logs', 'us-west-2')
log_group_name = 'dummy'
tags = {'tag_key_1': 'tag_value_1', 'tag_key_2': 'tag_value_2'}
response = conn.create_log_group(logGroupName=log_group_name)
response = conn.list_tags_log_group(logGroupName=log_group_name)
assert response['tags'] == {}
response = conn.delete_log_group(logGroupName=log_group_name)
response = conn.create_log_group(logGroupName=log_group_name, tags=tags)
response = conn.list_tags_log_group(logGroupName=log_group_name)
assert response['tags'] == tags
response = conn.delete_log_group(logGroupName=log_group_name)