Add Log Service describe_resource_policies, delete_resource_policy (#4150)

Co-authored-by: Karri Balk <kbalk@users.noreply.github.com>
This commit is contained in:
kbalk 2021-08-07 04:04:15 -04:00 committed by GitHub
parent b4ae6a9cce
commit 6b880003c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 303 additions and 105 deletions

View File

@ -6901,7 +6901,7 @@
- [X] delete_log_stream
- [ ] delete_metric_filter
- [ ] delete_query_definition
- [ ] delete_resource_policy
- [X] delete_resource_policy
- [X] delete_retention_policy
- [X] delete_subscription_filter
- [ ] describe_destinations
@ -6911,7 +6911,7 @@
- [ ] describe_metric_filters
- [ ] describe_queries
- [ ] describe_query_definitions
- [ ] describe_resource_policies
- [X] describe_resource_policies
- [X] describe_subscription_filters
- [ ] disassociate_kms_key
- [X] filter_log_events
@ -6931,6 +6931,7 @@
- [X] start_query
- [ ] stop_query
- [X] tag_log_group
- [ ] tail
- [ ] test_metric_filter
- [X] untag_log_group
</details>

View File

@ -9,7 +9,7 @@ class LogsClientError(JsonRESTError):
class ResourceNotFoundException(LogsClientError):
def __init__(self, msg=None):
self.code = 400
super(ResourceNotFoundException, self).__init__(
super().__init__(
"ResourceNotFoundException", msg or "The specified log group does not exist"
)
@ -17,7 +17,7 @@ class ResourceNotFoundException(LogsClientError):
class InvalidParameterException(LogsClientError):
def __init__(self, msg=None):
self.code = 400
super(InvalidParameterException, self).__init__(
super().__init__(
"InvalidParameterException", msg or "A parameter is specified incorrectly."
)
@ -25,7 +25,7 @@ class InvalidParameterException(LogsClientError):
class ResourceAlreadyExistsException(LogsClientError):
def __init__(self):
self.code = 400
super(ResourceAlreadyExistsException, self).__init__(
super().__init__(
"ResourceAlreadyExistsException", "The specified log group already exists"
)
@ -33,6 +33,4 @@ class ResourceAlreadyExistsException(LogsClientError):
class LimitExceededException(LogsClientError):
def __init__(self):
self.code = 400
super(LimitExceededException, self).__init__(
"LimitExceededException", "Resource limit exceeded."
)
super().__init__("LimitExceededException", "Resource limit exceeded.")

View File

@ -1,20 +1,23 @@
import uuid
from boto3 import Session
from moto import core as moto_core
from moto.core import BaseBackend, BaseModel
from moto.core.utils import unix_time_millis
from .exceptions import (
from moto.logs.exceptions import (
ResourceNotFoundException,
ResourceAlreadyExistsException,
InvalidParameterException,
LimitExceededException,
)
import uuid
MAX_RESOURCE_POLICIES_PER_REGION = 10
class LogQuery(BaseModel):
def __init__(self, id, start_time, end_time, query):
self.id = id
def __init__(self, query_id, start_time, end_time, query):
self.query_id = query_id
self.start_time = start_time
self.end_time = end_time
self.query = query
@ -24,16 +27,16 @@ class LogEvent(BaseModel):
_event_id = 0
def __init__(self, ingestion_time, log_event):
self.ingestionTime = ingestion_time
self.ingestion_time = ingestion_time
self.timestamp = log_event["timestamp"]
self.message = log_event["message"]
self.eventId = self.__class__._event_id
self.event_id = self.__class__._event_id
self.__class__._event_id += 1
def to_filter_dict(self):
return {
"eventId": str(self.eventId),
"ingestionTime": self.ingestionTime,
"eventId": str(self.event_id),
"ingestionTime": self.ingestion_time,
# "logStreamName":
"message": self.message,
"timestamp": self.timestamp,
@ -41,7 +44,7 @@ class LogEvent(BaseModel):
def to_response_dict(self):
return {
"ingestionTime": self.ingestionTime,
"ingestionTime": self.ingestion_time,
"message": self.message,
"timestamp": self.timestamp,
}
@ -58,13 +61,13 @@ class LogStream(BaseModel):
log_group=log_group,
log_stream=name,
)
self.creationTime = int(unix_time_millis())
self.firstEventTimestamp = None
self.lastEventTimestamp = None
self.lastIngestionTime = None
self.logStreamName = name
self.storedBytes = 0
self.uploadSequenceToken = (
self.creation_time = int(unix_time_millis())
self.first_event_timestamp = None
self.last_event_timestamp = None
self.last_ingestion_time = None
self.log_stream_name = name
self.stored_bytes = 0
self.upload_sequence_token = (
0 # I'm guessing this is token needed for sequenceToken by put_events
)
self.events = []
@ -75,10 +78,10 @@ class LogStream(BaseModel):
def _update(self):
# events can be empty when stream is described soon after creation
self.firstEventTimestamp = (
self.first_event_timestamp = (
min([x.timestamp for x in self.events]) if self.events else None
)
self.lastEventTimestamp = (
self.last_event_timestamp = (
max([x.timestamp for x in self.events]) if self.events else None
)
@ -88,16 +91,16 @@ class LogStream(BaseModel):
res = {
"arn": self.arn,
"creationTime": self.creationTime,
"logStreamName": self.logStreamName,
"storedBytes": self.storedBytes,
"creationTime": self.creation_time,
"logStreamName": self.log_stream_name,
"storedBytes": self.stored_bytes,
}
if self.events:
rest = {
"firstEventTimestamp": self.firstEventTimestamp,
"lastEventTimestamp": self.lastEventTimestamp,
"lastIngestionTime": self.lastIngestionTime,
"uploadSequenceToken": str(self.uploadSequenceToken),
"firstEventTimestamp": self.first_event_timestamp,
"lastEventTimestamp": self.last_event_timestamp,
"lastIngestionTime": self.last_ingestion_time,
"uploadSequenceToken": str(self.upload_sequence_token),
}
res.update(rest)
return res
@ -107,21 +110,23 @@ class LogStream(BaseModel):
):
# TODO: ensure sequence_token
# TODO: to be thread safe this would need a lock
self.lastIngestionTime = int(unix_time_millis())
self.last_ingestion_time = int(unix_time_millis())
# TODO: make this match AWS if possible
self.storedBytes += sum([len(log_event["message"]) for log_event in log_events])
self.stored_bytes += sum(
[len(log_event["message"]) for log_event in log_events]
)
events = [
LogEvent(self.lastIngestionTime, log_event) for log_event in log_events
LogEvent(self.last_ingestion_time, log_event) for log_event in log_events
]
self.events += events
self.uploadSequenceToken += 1
self.upload_sequence_token += 1
if self.destination_arn and self.destination_arn.split(":")[2] == "lambda":
from moto.awslambda import lambda_backends # due to circular dependency
lambda_log_events = [
{
"id": event.eventId,
"id": event.event_id,
"timestamp": event.timestamp,
"message": event.message,
}
@ -136,7 +141,7 @@ class LogStream(BaseModel):
lambda_log_events,
)
return "{:056d}".format(self.uploadSequenceToken)
return "{:056d}".format(self.upload_sequence_token)
def get_log_events(
self,
@ -243,7 +248,7 @@ class LogStream(BaseModel):
filter(filter_func, self.events), key=lambda x: x.timestamp
):
event_obj = event.to_filter_dict()
event_obj["logStreamName"] = self.logStreamName
event_obj["logStreamName"] = self.log_stream_name
events.append(event_obj)
return events
@ -252,10 +257,8 @@ class LogGroup(BaseModel):
def __init__(self, region, name, tags, **kwargs):
self.name = name
self.region = region
self.arn = "arn:aws:logs:{region}:1:log-group:{log_group}".format(
region=region, log_group=name
)
self.creationTime = int(unix_time_millis())
self.arn = f"arn:aws:logs:{region}:{moto_core.ACCOUNT_ID}:log-group:{name}"
self.creation_time = int(unix_time_millis())
self.tags = tags
self.streams = dict() # {name: LogStream}
self.retention_in_days = kwargs.get(
@ -289,7 +292,7 @@ class LogGroup(BaseModel):
next_token,
order_by,
):
# responses only logStreamName, creationTime, arn, storedBytes when no events are stored.
# responses only log_stream_name, creation_time, arn, stored_bytes when no events are stored.
log_streams = [
(name, stream.to_describe_dict())
@ -432,7 +435,7 @@ class LogGroup(BaseModel):
)
searched_streams = [
{"logStreamName": stream.logStreamName, "searchedCompletely": True}
{"logStreamName": stream.log_stream_name, "searchedCompletely": True}
for stream in streams
]
return events_page, next_token, searched_streams
@ -440,10 +443,10 @@ class LogGroup(BaseModel):
def to_describe_dict(self):
log_group = {
"arn": self.arn,
"creationTime": self.creationTime,
"creationTime": self.creation_time,
"logGroupName": self.name,
"metricFilterCount": 0,
"storedBytes": sum(s.storedBytes for s in self.streams.values()),
"storedBytes": sum(s.stored_bytes for s in self.streams.values()),
}
# AWS only returns retentionInDays if a value is set for the log group (ie. not Never Expire)
if self.retention_in_days:
@ -483,7 +486,7 @@ class LogGroup(BaseModel):
if self.subscription_filters[0]["filterName"] == filter_name:
creation_time = self.subscription_filters[0]["creationTime"]
else:
raise LimitExceededException
raise LimitExceededException()
for stream in self.streams.values():
stream.destination_arn = destination_arn
@ -686,9 +689,49 @@ class LogsBackend(BaseBackend):
log_group = self.groups[log_group_name]
return log_group.set_retention_policy(None)
def describe_resource_policies(
self, next_token, limit
): # pylint: disable=unused-argument
"""Return list of resource policies.
The next_token and limit arguments are ignored. The maximum
number of resource policies per region is a small number (less
than 50), so pagination isn't needed.
"""
limit = limit or MAX_RESOURCE_POLICIES_PER_REGION
policies = []
for policy_name, policy_info in self.resource_policies.items():
policies.append(
{
"policyName": policy_name,
"policyDocument": policy_info["policyDocument"],
"lastUpdatedTime": policy_info["lastUpdatedTime"],
}
)
return policies
def put_resource_policy(self, policy_name, policy_doc):
policy = {"policyName": policy_name, "policyDocument": policy_doc}
"""Create resource policy and return dict of policy name and doc."""
if len(self.resource_policies) == MAX_RESOURCE_POLICIES_PER_REGION:
raise LimitExceededException()
policy = {
"policyName": policy_name,
"policyDocument": policy_doc,
"lastUpdatedTime": int(unix_time_millis()),
}
self.resource_policies[policy_name] = policy
return {"resourcePolicy": policy}
def delete_resource_policy(self, policy_name):
"""Remove resource policy with a policy name matching given name."""
if policy_name not in self.resource_policies:
raise ResourceNotFoundException(
msg=f"Policy with name [{policy_name}] does not exist"
)
del self.resource_policies[policy_name]
return ""
def list_tags_log_group(self, log_group_name):
if log_group_name not in self.groups:
@ -760,9 +803,13 @@ class LogsBackend(BaseBackend):
logs_backends = {}
for region in Session().get_available_regions("logs"):
logs_backends[region] = LogsBackend(region)
for region in Session().get_available_regions("logs", partition_name="aws-us-gov"):
logs_backends[region] = LogsBackend(region)
for region in Session().get_available_regions("logs", partition_name="aws-cn"):
logs_backends[region] = LogsBackend(region)
for available_region in Session().get_available_regions("logs"):
logs_backends[available_region] = LogsBackend(available_region)
for available_region in Session().get_available_regions(
"logs", partition_name="aws-us-gov"
):
logs_backends[available_region] = LogsBackend(available_region)
for available_region in Session().get_available_regions(
"logs", partition_name="aws-cn"
):
logs_backends[available_region] = LogsBackend(available_region)

View File

@ -1,8 +1,9 @@
from moto.core.responses import BaseResponse
from .models import logs_backends
import json
from .exceptions import InvalidParameterException
from moto.core.responses import BaseResponse
from .models import logs_backends
# See http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/Welcome.html
@ -167,10 +168,21 @@ class LogsResponse(BaseResponse):
self.logs_backend.delete_retention_policy(log_group_name)
return ""
def describe_resource_policies(self):
next_token = self._get_param("nextToken")
limit = self._get_param("limit")
policies = self.logs_backend.describe_resource_policies(next_token, limit)
return json.dumps({"resourcePolicies": policies})
def put_resource_policy(self):
policy_name = self._get_param("policyName")
policy_doc = self._get_param("policyDocument")
self.logs_backend.put_resource_policy(policy_name, policy_doc)
result = self.logs_backend.put_resource_policy(policy_name, policy_doc)
return json.dumps(result)
def delete_resource_policy(self):
policy_name = self._get_param("policyName")
self.logs_backend.delete_resource_policy(policy_name)
return ""
def list_tags_log_group(self):

View File

@ -1,15 +1,39 @@
import json
import os
import time
from unittest import SkipTest
import boto3
import pytest
import sure # noqa
from botocore.exceptions import ClientError
from moto import mock_logs, settings
from moto.core.utils import unix_time_millis
from moto.logs.models import MAX_RESOURCE_POLICIES_PER_REGION
_logs_region = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
TEST_REGION = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
@pytest.fixture
def json_policy_doc():
"""Returns a policy document in JSON format.
The ARN is bogus, but that shouldn't matter for the test.
"""
return json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Route53LogsToCloudWatchLogs",
"Effect": "Allow",
"Principal": {"Service": ["route53.amazonaws.com"]},
"Action": "logs:PutLogEvents",
"Resource": "log_arn",
}
],
}
)
@mock_logs
@ -22,7 +46,7 @@ _logs_region = "us-east-1" if settings.TEST_SERVER_MODE else "us-west-2"
)
def test_create_log_group(kms_key_id):
# Given
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
create_logs_params = dict(logGroupName="dummy")
if kms_key_id:
@ -45,7 +69,7 @@ def test_create_log_group(kms_key_id):
@mock_logs
def test_exceptions():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
log_stream_name = "dummp-stream"
conn.create_log_group(logGroupName=log_group_name)
@ -79,7 +103,7 @@ def test_exceptions():
@mock_logs
def test_put_logs():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
log_stream_name = "stream"
conn.create_log_group(logGroupName=log_group_name)
@ -88,22 +112,22 @@ def test_put_logs():
{"timestamp": 0, "message": "hello"},
{"timestamp": 0, "message": "world"},
]
putRes = conn.put_log_events(
put_results = conn.put_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages
)
res = conn.get_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name
)
events = res["events"]
nextSequenceToken = putRes["nextSequenceToken"]
assert isinstance(nextSequenceToken, str) == True
assert len(nextSequenceToken) == 56
next_sequence_token = put_results["nextSequenceToken"]
assert isinstance(next_sequence_token, str)
assert len(next_sequence_token) == 56
events.should.have.length_of(2)
@mock_logs
def test_filter_logs_interleaved():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
log_stream_name = "stream"
conn.create_log_group(logGroupName=log_group_name)
@ -129,7 +153,7 @@ def test_filter_logs_interleaved():
def test_filter_logs_raises_if_filter_pattern():
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true":
raise SkipTest("Does not work in server mode due to error in Workzeug")
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
log_stream_name = "stream"
conn.create_log_group(logGroupName=log_group_name)
@ -151,7 +175,7 @@ def test_filter_logs_raises_if_filter_pattern():
@mock_logs
def test_filter_logs_paging():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "/aws/dummy"
log_stream_name = "stream/stage"
conn.create_log_group(logGroupName=log_group_name)
@ -210,7 +234,7 @@ def test_filter_logs_paging():
@mock_logs
def test_put_retention_policy():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
response = conn.create_log_group(logGroupName=log_group_name)
@ -225,7 +249,7 @@ def test_put_retention_policy():
@mock_logs
def test_delete_retention_policy():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
response = conn.create_log_group(logGroupName=log_group_name)
@ -239,14 +263,130 @@ def test_delete_retention_policy():
response = conn.describe_log_groups(logGroupNamePrefix=log_group_name)
assert len(response["logGroups"]) == 1
assert response["logGroups"][0].get("retentionInDays") == None
assert response["logGroups"][0].get("retentionInDays") is None
response = conn.delete_log_group(logGroupName=log_group_name)
conn.delete_log_group(logGroupName=log_group_name)
@mock_logs
def test_put_resource_policy():
client = boto3.client("logs", TEST_REGION)
# For this test a policy document with a valid ARN will be used.
log_group_name = "test_log_group"
client.create_log_group(logGroupName=log_group_name)
log_group_info = client.describe_log_groups(logGroupNamePrefix=log_group_name)
policy_name = "test_policy"
policy_doc = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Route53LogsToCloudWatchLogs",
"Effect": "Allow",
"Principal": {"Service": ["route53.amazonaws.com"]},
"Action": "logs:PutLogEvents",
"Resource": log_group_info["logGroups"][0]["arn"],
}
],
}
)
response = client.put_resource_policy(
policyName=policy_name, policyDocument=policy_doc
)
assert response["resourcePolicy"]["policyName"] == policy_name
assert response["resourcePolicy"]["policyDocument"] == policy_doc
assert response["resourcePolicy"]["lastUpdatedTime"] <= int(unix_time_millis())
client.delete_log_group(logGroupName=log_group_name)
@mock_logs
def test_put_resource_policy_too_many(json_policy_doc):
client = boto3.client("logs", TEST_REGION)
# Create the maximum number of resource policies.
for idx in range(MAX_RESOURCE_POLICIES_PER_REGION):
policy_name = f"test_policy_{idx}"
client.put_resource_policy(
policyName=policy_name, policyDocument=json.dumps(json_policy_doc)
)
# Now create one more policy, which should generate an error.
with pytest.raises(ClientError) as exc:
client.put_resource_policy(
policyName="too_many", policyDocument=json.dumps(json_policy_doc)
)
exc_value = exc.value
exc_value.operation_name.should.equal("PutResourcePolicy")
exc_value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
exc_value.response["Error"]["Code"].should.equal("LimitExceededException")
exc_value.response["Error"]["Message"].should.contain("Resource limit exceeded.")
@mock_logs
def test_delete_resource_policy(json_policy_doc):
client = boto3.client("logs", TEST_REGION)
# Create a bunch of resource policies so we can give delete a workout.
base_policy_name = "test_policy"
for idx in range(MAX_RESOURCE_POLICIES_PER_REGION):
client.put_resource_policy(
policyName=f"{base_policy_name}_{idx}", policyDocument=json_policy_doc
)
# Verify that all those resource policies can be deleted.
for idx in range(MAX_RESOURCE_POLICIES_PER_REGION):
client.delete_resource_policy(policyName=f"{base_policy_name}_{idx}")
# Verify there are no resource policies.
response = client.describe_resource_policies()
policies = response["resourcePolicies"]
assert not policies
# Try deleting a non-existent resource policy.
with pytest.raises(ClientError) as exc:
client.delete_resource_policy(policyName="non-existent")
exc_value = exc.value
exc_value.operation_name.should.equal("DeleteResourcePolicy")
exc_value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
exc_value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
exc_value.response["Error"]["Message"].should.contain(
"Policy with name [non-existent] does not exist"
)
@mock_logs
def test_describe_resource_policies(json_policy_doc):
client = boto3.client("logs", TEST_REGION)
# Create the maximum number of resource policies so there's something
# to retrieve.
for idx in range(MAX_RESOURCE_POLICIES_PER_REGION):
policy_name = f"test_policy_{idx}"
client.put_resource_policy(
policyName=policy_name, policyDocument=json_policy_doc
)
# Retrieve all of the resource policies that were just created.
response = client.describe_resource_policies(limit=50)
assert "resourcePolicies" in response
policies = response["resourcePolicies"]
assert len(policies) == MAX_RESOURCE_POLICIES_PER_REGION
# Verify the retrieved list is valid.
now_millis = int(unix_time_millis())
for idx, policy in enumerate(policies):
assert policy["policyName"] == f"test_policy_{idx}"
assert policy["policyDocument"] == json_policy_doc
assert policy["lastUpdatedTime"] <= now_millis
@mock_logs
def test_get_log_events():
client = boto3.client("logs", "us-west-2")
client = boto3.client("logs", TEST_REGION)
log_group_name = "test"
log_stream_name = "stream"
client.create_log_group(logGroupName=log_group_name)
@ -326,7 +466,7 @@ def test_get_log_events():
@mock_logs
def test_get_log_events_with_start_from_head():
client = boto3.client("logs", "us-west-2")
client = boto3.client("logs", TEST_REGION)
log_group_name = "test"
log_stream_name = "stream"
client.create_log_group(logGroupName=log_group_name)
@ -409,44 +549,44 @@ def test_get_log_events_with_start_from_head():
@mock_logs
def test_get_log_events_errors():
client = boto3.client("logs", "us-west-2")
client = boto3.client("logs", TEST_REGION)
log_group_name = "test"
log_stream_name = "stream"
client.create_log_group(logGroupName=log_group_name)
client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
with pytest.raises(ClientError) as e:
with pytest.raises(ClientError) as exc:
client.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
nextToken="n/00000000000000000000000000000000000000000000000000000000",
)
ex = e.value
ex.operation_name.should.equal("GetLogEvents")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.equal("InvalidParameterException")
ex.response["Error"]["Message"].should.contain(
exc_value = exc.value
exc_value.operation_name.should.equal("GetLogEvents")
exc_value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
exc_value.response["Error"]["Code"].should.equal("InvalidParameterException")
exc_value.response["Error"]["Message"].should.contain(
"The specified nextToken is invalid."
)
with pytest.raises(ClientError) as e:
with pytest.raises(ClientError) as exc:
client.get_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
nextToken="not-existing-token",
)
ex = e.value
ex.operation_name.should.equal("GetLogEvents")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.equal("InvalidParameterException")
ex.response["Error"]["Message"].should.contain(
exc_value = exc.value
exc_value.operation_name.should.equal("GetLogEvents")
exc_value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
exc_value.response["Error"]["Code"].should.equal("InvalidParameterException")
exc_value.response["Error"]["Message"].should.contain(
"The specified nextToken is invalid."
)
@mock_logs
def test_list_tags_log_group():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
tags = {"tag_key_1": "tag_value_1", "tag_key_2": "tag_value_2"}
@ -464,7 +604,7 @@ def test_list_tags_log_group():
@mock_logs
def test_tag_log_group():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
tags = {"tag_key_1": "tag_value_1"}
response = conn.create_log_group(logGroupName=log_group_name)
@ -492,7 +632,7 @@ def test_tag_log_group():
@mock_logs
def test_untag_log_group():
conn = boto3.client("logs", "us-west-2")
conn = boto3.client("logs", TEST_REGION)
log_group_name = "dummy"
response = conn.create_log_group(logGroupName=log_group_name)
@ -530,15 +670,15 @@ def test_describe_subscription_filters_errors():
client = boto3.client("logs", "us-east-1")
# when
with pytest.raises(ClientError) as e:
with pytest.raises(ClientError) as exc:
client.describe_subscription_filters(logGroupName="not-existing-log-group",)
# then
ex = e.value
ex.operation_name.should.equal("DescribeSubscriptionFilters")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
exc_value = exc.value
exc_value.operation_name.should.equal("DescribeSubscriptionFilters")
exc_value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
exc_value.response["Error"]["Code"].should.contain("ResourceNotFoundException")
exc_value.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)
@ -604,7 +744,7 @@ def test_describe_log_streams_paging():
resp["logStreams"].should.have.length_of(2)
resp["logStreams"][0]["arn"].should.contain(log_group_name)
resp["nextToken"].should.equal(
u"{}@{}".format(log_group_name, resp["logStreams"][1]["logStreamName"])
"{}@{}".format(log_group_name, resp["logStreams"][1]["logStreamName"])
)
resp = client.describe_log_streams(
@ -613,7 +753,7 @@ def test_describe_log_streams_paging():
resp["logStreams"].should.have.length_of(1)
resp["logStreams"][0]["arn"].should.contain(log_group_name)
resp["nextToken"].should.equal(
u"{}@{}".format(log_group_name, resp["logStreams"][0]["logStreamName"])
"{}@{}".format(log_group_name, resp["logStreams"][0]["logStreamName"])
)
resp = client.describe_log_streams(
@ -652,7 +792,7 @@ def test_start_query():
assert "queryId" in response
with pytest.raises(ClientError) as e:
with pytest.raises(ClientError) as exc:
client.start_query(
logGroupName="/aws/codebuild/lowercase-dev-invalid",
startTime=int(time.time()),
@ -661,8 +801,8 @@ def test_start_query():
)
# then
ex = e.value
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
exc_value = exc.value
exc_value.response["Error"]["Code"].should.contain("ResourceNotFoundException")
exc_value.response["Error"]["Message"].should.equal(
"The specified log group does not exist"
)