CloudWatchLogs:filter_log_events() now supports the filterPattern-parameter (#5428)
This commit is contained in:
parent
0dfb61fbcf
commit
6035a44d79
@ -58,6 +58,11 @@ logs
|
|||||||
- [X] describe_subscription_filters
|
- [X] describe_subscription_filters
|
||||||
- [ ] disassociate_kms_key
|
- [ ] disassociate_kms_key
|
||||||
- [X] filter_log_events
|
- [X] filter_log_events
|
||||||
|
|
||||||
|
The following filter patterns are currently supported: Single Terms, Multiple Terms, Exact Phrases.
|
||||||
|
If the pattern is not supported, all events are returned.
|
||||||
|
|
||||||
|
|
||||||
- [X] get_log_events
|
- [X] get_log_events
|
||||||
- [ ] get_log_group_fields
|
- [ ] get_log_group_fields
|
||||||
- [ ] get_log_record
|
- [ ] get_log_record
|
||||||
|
@ -14,7 +14,7 @@ from moto.logs.exceptions import (
|
|||||||
LimitExceededException,
|
LimitExceededException,
|
||||||
)
|
)
|
||||||
from moto.s3.models import s3_backends
|
from moto.s3.models import s3_backends
|
||||||
from .utils import PAGINATION_MODEL
|
from .utils import PAGINATION_MODEL, EventMessageFilter
|
||||||
|
|
||||||
MAX_RESOURCE_POLICIES_PER_REGION = 10
|
MAX_RESOURCE_POLICIES_PER_REGION = 10
|
||||||
|
|
||||||
@ -234,9 +234,6 @@ class LogStream(BaseModel):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def filter_log_events(self, start_time, end_time, filter_pattern):
|
def filter_log_events(self, start_time, end_time, filter_pattern):
|
||||||
if filter_pattern:
|
|
||||||
raise NotImplementedError("filter_pattern is not yet implemented")
|
|
||||||
|
|
||||||
def filter_func(event):
|
def filter_func(event):
|
||||||
if start_time and event.timestamp < start_time:
|
if start_time and event.timestamp < start_time:
|
||||||
return False
|
return False
|
||||||
@ -244,6 +241,9 @@ class LogStream(BaseModel):
|
|||||||
if end_time and event.timestamp > end_time:
|
if end_time and event.timestamp > end_time:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
if not EventMessageFilter(filter_pattern).matches(event.message):
|
||||||
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
events = []
|
events = []
|
||||||
@ -769,6 +769,10 @@ class LogsBackend(BaseBackend):
|
|||||||
filter_pattern,
|
filter_pattern,
|
||||||
interleaved,
|
interleaved,
|
||||||
):
|
):
|
||||||
|
"""
|
||||||
|
The following filter patterns are currently supported: Single Terms, Multiple Terms, Exact Phrases.
|
||||||
|
If the pattern is not supported, all events are returned.
|
||||||
|
"""
|
||||||
if log_group_name not in self.groups:
|
if log_group_name not in self.groups:
|
||||||
raise ResourceNotFoundException()
|
raise ResourceNotFoundException()
|
||||||
if limit and limit > 1000:
|
if limit and limit > 1000:
|
||||||
|
@ -13,3 +13,47 @@ PAGINATION_MODEL = {
|
|||||||
"unique_attribute": "arn",
|
"unique_attribute": "arn",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class FilterPattern:
|
||||||
|
def __init__(self, term):
|
||||||
|
self.term = term
|
||||||
|
|
||||||
|
|
||||||
|
class QuotedTermFilterPattern(FilterPattern):
|
||||||
|
def matches(self, message):
|
||||||
|
# We still have the quotes around the term - we should remove those in the parser
|
||||||
|
return self.term[1:-1] in message
|
||||||
|
|
||||||
|
|
||||||
|
class SingleTermFilterPattern(FilterPattern):
|
||||||
|
def matches(self, message):
|
||||||
|
required_words = self.term.split(" ")
|
||||||
|
return all([word in message for word in required_words])
|
||||||
|
|
||||||
|
|
||||||
|
class UnsupportedFilterPattern(FilterPattern):
|
||||||
|
def matches(self, message): # pylint: disable=unused-argument
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class EventMessageFilter:
|
||||||
|
def __init__(self, pattern: str):
|
||||||
|
current_phrase = ""
|
||||||
|
current_type = None
|
||||||
|
if pattern:
|
||||||
|
for char in pattern:
|
||||||
|
if not current_type:
|
||||||
|
if char.isalpha():
|
||||||
|
current_type = SingleTermFilterPattern
|
||||||
|
elif char == '"':
|
||||||
|
current_type = QuotedTermFilterPattern
|
||||||
|
else:
|
||||||
|
current_type = UnsupportedFilterPattern
|
||||||
|
current_phrase += char
|
||||||
|
else:
|
||||||
|
current_type = UnsupportedFilterPattern
|
||||||
|
self.filter_type = current_type(current_phrase)
|
||||||
|
|
||||||
|
def matches(self, message):
|
||||||
|
return self.filter_type.matches(message)
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
|
||||||
import time
|
import time
|
||||||
import sure # noqa # pylint: disable=unused-import
|
import sure # noqa # pylint: disable=unused-import
|
||||||
from unittest import SkipTest
|
|
||||||
from datetime import timedelta, datetime
|
from datetime import timedelta, datetime
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
@ -424,54 +422,6 @@ def test_put_logs():
|
|||||||
events.should.have.length_of(2)
|
events.should.have.length_of(2)
|
||||||
|
|
||||||
|
|
||||||
@mock_logs
|
|
||||||
def test_filter_logs_interleaved():
|
|
||||||
conn = boto3.client("logs", TEST_REGION)
|
|
||||||
log_group_name = "dummy"
|
|
||||||
log_stream_name = "stream"
|
|
||||||
conn.create_log_group(logGroupName=log_group_name)
|
|
||||||
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
|
|
||||||
messages = [
|
|
||||||
{"timestamp": 0, "message": "hello"},
|
|
||||||
{"timestamp": 0, "message": "world"},
|
|
||||||
]
|
|
||||||
conn.put_log_events(
|
|
||||||
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages
|
|
||||||
)
|
|
||||||
res = conn.filter_log_events(
|
|
||||||
logGroupName=log_group_name, logStreamNames=[log_stream_name], interleaved=True
|
|
||||||
)
|
|
||||||
events = res["events"]
|
|
||||||
for original_message, resulting_event in zip(messages, events):
|
|
||||||
resulting_event["eventId"].should.equal(str(resulting_event["eventId"]))
|
|
||||||
resulting_event["timestamp"].should.equal(original_message["timestamp"])
|
|
||||||
resulting_event["message"].should.equal(original_message["message"])
|
|
||||||
|
|
||||||
|
|
||||||
@mock_logs
|
|
||||||
def test_filter_logs_raises_if_filter_pattern():
|
|
||||||
if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true":
|
|
||||||
raise SkipTest("Does not work in server mode due to error in Workzeug")
|
|
||||||
conn = boto3.client("logs", TEST_REGION)
|
|
||||||
log_group_name = "dummy"
|
|
||||||
log_stream_name = "stream"
|
|
||||||
conn.create_log_group(logGroupName=log_group_name)
|
|
||||||
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
|
|
||||||
messages = [
|
|
||||||
{"timestamp": 0, "message": "hello"},
|
|
||||||
{"timestamp": 0, "message": "world"},
|
|
||||||
]
|
|
||||||
conn.put_log_events(
|
|
||||||
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages
|
|
||||||
)
|
|
||||||
with pytest.raises(NotImplementedError):
|
|
||||||
conn.filter_log_events(
|
|
||||||
logGroupName=log_group_name,
|
|
||||||
logStreamNames=[log_stream_name],
|
|
||||||
filterPattern='{$.message = "hello"}',
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@mock_logs
|
@mock_logs
|
||||||
def test_put_log_events_in_wrong_order():
|
def test_put_log_events_in_wrong_order():
|
||||||
conn = boto3.client("logs", "us-east-1")
|
conn = boto3.client("logs", "us-east-1")
|
||||||
@ -544,103 +494,6 @@ def test_put_log_events_in_the_future(minutes):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@mock_logs
|
|
||||||
def test_put_log_events_now():
|
|
||||||
conn = boto3.client("logs", "us-east-1")
|
|
||||||
log_group_name = "test"
|
|
||||||
log_stream_name = "teststream"
|
|
||||||
conn.create_log_group(logGroupName=log_group_name)
|
|
||||||
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
|
|
||||||
|
|
||||||
ts_1 = int(unix_time_millis())
|
|
||||||
ts_2 = int(unix_time_millis(datetime.utcnow() + timedelta(minutes=5)))
|
|
||||||
ts_3 = int(unix_time_millis(datetime.utcnow() + timedelta(days=1)))
|
|
||||||
|
|
||||||
messages = [
|
|
||||||
{"message": f"Message {idx}", "timestamp": ts}
|
|
||||||
for idx, ts in enumerate([ts_1, ts_2, ts_3])
|
|
||||||
]
|
|
||||||
|
|
||||||
resp = conn.put_log_events(
|
|
||||||
logGroupName=log_group_name,
|
|
||||||
logStreamName=log_stream_name,
|
|
||||||
logEvents=messages,
|
|
||||||
sequenceToken="49599396607703531511419593985621160512859251095480828066",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Message 2 was too new
|
|
||||||
resp.should.have.key("rejectedLogEventsInfo").should.equal(
|
|
||||||
{"tooNewLogEventStartIndex": 2}
|
|
||||||
)
|
|
||||||
# Message 0 and 1 were persisted though
|
|
||||||
events = conn.filter_log_events(
|
|
||||||
logGroupName=log_group_name, logStreamNames=[log_stream_name], limit=20
|
|
||||||
)["events"]
|
|
||||||
messages = [e["message"] for e in events]
|
|
||||||
messages.should.contain("Message 0")
|
|
||||||
messages.should.contain("Message 1")
|
|
||||||
messages.shouldnt.contain("Message 2")
|
|
||||||
|
|
||||||
|
|
||||||
@mock_logs
|
|
||||||
def test_filter_logs_paging():
|
|
||||||
conn = boto3.client("logs", TEST_REGION)
|
|
||||||
log_group_name = "/aws/dummy"
|
|
||||||
log_stream_name = "stream/stage"
|
|
||||||
conn.create_log_group(logGroupName=log_group_name)
|
|
||||||
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
|
|
||||||
timestamp = int(unix_time_millis(datetime.utcnow()))
|
|
||||||
messages = []
|
|
||||||
for i in range(25):
|
|
||||||
messages.append(
|
|
||||||
{"message": "Message number {}".format(i), "timestamp": timestamp}
|
|
||||||
)
|
|
||||||
timestamp += 100
|
|
||||||
|
|
||||||
conn.put_log_events(
|
|
||||||
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages
|
|
||||||
)
|
|
||||||
res = conn.filter_log_events(
|
|
||||||
logGroupName=log_group_name, logStreamNames=[log_stream_name], limit=20
|
|
||||||
)
|
|
||||||
events = res["events"]
|
|
||||||
events.should.have.length_of(20)
|
|
||||||
res["nextToken"].should.equal("/aws/dummy@stream/stage@" + events[-1]["eventId"])
|
|
||||||
|
|
||||||
res = conn.filter_log_events(
|
|
||||||
logGroupName=log_group_name,
|
|
||||||
logStreamNames=[log_stream_name],
|
|
||||||
limit=20,
|
|
||||||
nextToken=res["nextToken"],
|
|
||||||
)
|
|
||||||
events += res["events"]
|
|
||||||
events.should.have.length_of(25)
|
|
||||||
res.should_not.have.key("nextToken")
|
|
||||||
|
|
||||||
for original_message, resulting_event in zip(messages, events):
|
|
||||||
resulting_event["eventId"].should.equal(str(resulting_event["eventId"]))
|
|
||||||
resulting_event["timestamp"].should.equal(original_message["timestamp"])
|
|
||||||
resulting_event["message"].should.equal(original_message["message"])
|
|
||||||
|
|
||||||
res = conn.filter_log_events(
|
|
||||||
logGroupName=log_group_name,
|
|
||||||
logStreamNames=[log_stream_name],
|
|
||||||
limit=20,
|
|
||||||
nextToken="invalid-token",
|
|
||||||
)
|
|
||||||
res["events"].should.have.length_of(0)
|
|
||||||
res.should_not.have.key("nextToken")
|
|
||||||
|
|
||||||
res = conn.filter_log_events(
|
|
||||||
logGroupName=log_group_name,
|
|
||||||
logStreamNames=[log_stream_name],
|
|
||||||
limit=20,
|
|
||||||
nextToken="wrong-group@stream@999",
|
|
||||||
)
|
|
||||||
res["events"].should.have.length_of(0)
|
|
||||||
res.should_not.have.key("nextToken")
|
|
||||||
|
|
||||||
|
|
||||||
@mock_logs
|
@mock_logs
|
||||||
def test_put_retention_policy():
|
def test_put_retention_policy():
|
||||||
conn = boto3.client("logs", TEST_REGION)
|
conn = boto3.client("logs", TEST_REGION)
|
||||||
|
191
tests/test_logs/test_logs_filter.py
Normal file
191
tests/test_logs/test_logs_filter.py
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
import boto3
|
||||||
|
import sure # noqa # pylint: disable=unused-import
|
||||||
|
from unittest import TestCase
|
||||||
|
from datetime import timedelta, datetime
|
||||||
|
|
||||||
|
from moto import mock_logs
|
||||||
|
from moto.core.utils import unix_time_millis
|
||||||
|
|
||||||
|
TEST_REGION = "eu-west-1"
|
||||||
|
|
||||||
|
|
||||||
|
class TestLogFilter(TestCase):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
self.conn = boto3.client("logs", TEST_REGION)
|
||||||
|
self.log_group_name = "dummy"
|
||||||
|
self.log_stream_name = "stream"
|
||||||
|
self.conn.create_log_group(logGroupName=self.log_group_name)
|
||||||
|
self.conn.create_log_stream(
|
||||||
|
logGroupName=self.log_group_name, logStreamName=self.log_stream_name
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_logs
|
||||||
|
class TestLogFilterParameters(TestLogFilter):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
super().setUp()
|
||||||
|
|
||||||
|
def test_filter_logs_interleaved(self):
|
||||||
|
messages = [
|
||||||
|
{"timestamp": 0, "message": "hello"},
|
||||||
|
{"timestamp": 0, "message": "world"},
|
||||||
|
]
|
||||||
|
self.conn.put_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamName=self.log_stream_name,
|
||||||
|
logEvents=messages,
|
||||||
|
)
|
||||||
|
res = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
interleaved=True,
|
||||||
|
)
|
||||||
|
events = res["events"]
|
||||||
|
for original_message, resulting_event in zip(messages, events):
|
||||||
|
resulting_event["eventId"].should.equal(str(resulting_event["eventId"]))
|
||||||
|
resulting_event["timestamp"].should.equal(original_message["timestamp"])
|
||||||
|
resulting_event["message"].should.equal(original_message["message"])
|
||||||
|
|
||||||
|
def test_put_log_events_now(self):
|
||||||
|
ts_1 = int(unix_time_millis())
|
||||||
|
ts_2 = int(unix_time_millis(datetime.utcnow() + timedelta(minutes=5)))
|
||||||
|
ts_3 = int(unix_time_millis(datetime.utcnow() + timedelta(days=1)))
|
||||||
|
|
||||||
|
messages = [
|
||||||
|
{"message": f"Message {idx}", "timestamp": ts}
|
||||||
|
for idx, ts in enumerate([ts_1, ts_2, ts_3])
|
||||||
|
]
|
||||||
|
|
||||||
|
resp = self.conn.put_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamName=self.log_stream_name,
|
||||||
|
logEvents=messages,
|
||||||
|
sequenceToken="49599396607703531511419593985621160512859251095480828066",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Message 2 was too new
|
||||||
|
resp.should.have.key("rejectedLogEventsInfo").should.equal(
|
||||||
|
{"tooNewLogEventStartIndex": 2}
|
||||||
|
)
|
||||||
|
# Message 0 and 1 were persisted though
|
||||||
|
events = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
limit=20,
|
||||||
|
)["events"]
|
||||||
|
messages = [e["message"] for e in events]
|
||||||
|
messages.should.contain("Message 0")
|
||||||
|
messages.should.contain("Message 1")
|
||||||
|
messages.shouldnt.contain("Message 2")
|
||||||
|
|
||||||
|
def test_filter_logs_paging(self):
|
||||||
|
timestamp = int(unix_time_millis(datetime.utcnow()))
|
||||||
|
messages = []
|
||||||
|
for i in range(25):
|
||||||
|
messages.append(
|
||||||
|
{"message": "Message number {}".format(i), "timestamp": timestamp}
|
||||||
|
)
|
||||||
|
timestamp += 100
|
||||||
|
|
||||||
|
self.conn.put_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamName=self.log_stream_name,
|
||||||
|
logEvents=messages,
|
||||||
|
)
|
||||||
|
res = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
limit=20,
|
||||||
|
)
|
||||||
|
events = res["events"]
|
||||||
|
events.should.have.length_of(20)
|
||||||
|
res["nextToken"].should.equal("dummy@stream@" + events[-1]["eventId"])
|
||||||
|
|
||||||
|
res = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
limit=20,
|
||||||
|
nextToken=res["nextToken"],
|
||||||
|
)
|
||||||
|
events += res["events"]
|
||||||
|
events.should.have.length_of(25)
|
||||||
|
res.should_not.have.key("nextToken")
|
||||||
|
|
||||||
|
for original_message, resulting_event in zip(messages, events):
|
||||||
|
resulting_event["eventId"].should.equal(str(resulting_event["eventId"]))
|
||||||
|
resulting_event["timestamp"].should.equal(original_message["timestamp"])
|
||||||
|
resulting_event["message"].should.equal(original_message["message"])
|
||||||
|
|
||||||
|
res = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
limit=20,
|
||||||
|
nextToken="wrong-group@stream@999",
|
||||||
|
)
|
||||||
|
res["events"].should.have.length_of(0)
|
||||||
|
res.should_not.have.key("nextToken")
|
||||||
|
|
||||||
|
def test_filter_logs_paging__unknown_token(self):
|
||||||
|
res = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
limit=20,
|
||||||
|
nextToken="invalid-token",
|
||||||
|
)
|
||||||
|
res["events"].should.have.length_of(0)
|
||||||
|
res.should_not.have.key("nextToken")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_logs
|
||||||
|
class TestLogsFilterPattern(TestLogFilter):
|
||||||
|
def setUp(self) -> None:
|
||||||
|
super().setUp()
|
||||||
|
now = int(unix_time_millis(datetime.utcnow()))
|
||||||
|
messages = [
|
||||||
|
{"timestamp": now, "message": "hello"},
|
||||||
|
{"timestamp": now, "message": "world"},
|
||||||
|
{"timestamp": now, "message": "hello world"},
|
||||||
|
{"timestamp": now, "message": "goodbye world"},
|
||||||
|
{"timestamp": now, "message": "hello cruela"},
|
||||||
|
{"timestamp": now, "message": "goodbye cruel world"},
|
||||||
|
]
|
||||||
|
self.conn.put_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamName=self.log_stream_name,
|
||||||
|
logEvents=messages,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_unknown_pattern(self):
|
||||||
|
events = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
filterPattern='{$.message = "hello"}',
|
||||||
|
)["events"]
|
||||||
|
events.should.have.length_of(6)
|
||||||
|
|
||||||
|
def test_simple_word_pattern(self):
|
||||||
|
events = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
filterPattern="hello",
|
||||||
|
)["events"]
|
||||||
|
messages = [e["message"] for e in events]
|
||||||
|
set(messages).should.equal({"hello", "hello cruela", "hello world"})
|
||||||
|
|
||||||
|
def test_multiple_words_pattern(self):
|
||||||
|
events = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
filterPattern="goodbye world",
|
||||||
|
)["events"]
|
||||||
|
messages = [e["message"] for e in events]
|
||||||
|
set(messages).should.equal({"goodbye world", "goodbye cruel world"})
|
||||||
|
|
||||||
|
def test_quoted_pattern(self):
|
||||||
|
events = self.conn.filter_log_events(
|
||||||
|
logGroupName=self.log_group_name,
|
||||||
|
logStreamNames=[self.log_stream_name],
|
||||||
|
filterPattern='"hello cruel"',
|
||||||
|
)["events"]
|
||||||
|
messages = [e["message"] for e in events]
|
||||||
|
set(messages).should.equal({"hello cruela"})
|
Loading…
x
Reference in New Issue
Block a user