From 6035a44d79770a135ffc4e244c23fedc17fc5feb Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 29 Aug 2022 08:47:17 +0000 Subject: [PATCH] CloudWatchLogs:filter_log_events() now supports the filterPattern-parameter (#5428) --- docs/docs/services/logs.rst | 5 + moto/logs/models.py | 12 +- moto/logs/utils.py | 44 +++++++ tests/test_logs/test_logs.py | 147 --------------------- tests/test_logs/test_logs_filter.py | 191 ++++++++++++++++++++++++++++ 5 files changed, 248 insertions(+), 151 deletions(-) create mode 100644 tests/test_logs/test_logs_filter.py diff --git a/docs/docs/services/logs.rst b/docs/docs/services/logs.rst index 4c9fc7a4a..bf0e1b91d 100644 --- a/docs/docs/services/logs.rst +++ b/docs/docs/services/logs.rst @@ -58,6 +58,11 @@ logs - [X] describe_subscription_filters - [ ] disassociate_kms_key - [X] filter_log_events + + The following filter patterns are currently supported: Single Terms, Multiple Terms, Exact Phrases. + If the pattern is not supported, all events are returned. + + - [X] get_log_events - [ ] get_log_group_fields - [ ] get_log_record diff --git a/moto/logs/models.py b/moto/logs/models.py index 7447fd804..c979f960c 100644 --- a/moto/logs/models.py +++ b/moto/logs/models.py @@ -14,7 +14,7 @@ from moto.logs.exceptions import ( LimitExceededException, ) from moto.s3.models import s3_backends -from .utils import PAGINATION_MODEL +from .utils import PAGINATION_MODEL, EventMessageFilter MAX_RESOURCE_POLICIES_PER_REGION = 10 @@ -234,9 +234,6 @@ class LogStream(BaseModel): ) def filter_log_events(self, start_time, end_time, filter_pattern): - if filter_pattern: - raise NotImplementedError("filter_pattern is not yet implemented") - def filter_func(event): if start_time and event.timestamp < start_time: return False @@ -244,6 +241,9 @@ class LogStream(BaseModel): if end_time and event.timestamp > end_time: return False + if not EventMessageFilter(filter_pattern).matches(event.message): + return False + return True events = [] @@ -769,6 +769,10 @@ class LogsBackend(BaseBackend): filter_pattern, interleaved, ): + """ + The following filter patterns are currently supported: Single Terms, Multiple Terms, Exact Phrases. + If the pattern is not supported, all events are returned. + """ if log_group_name not in self.groups: raise ResourceNotFoundException() if limit and limit > 1000: diff --git a/moto/logs/utils.py b/moto/logs/utils.py index 74d832883..c2c58dfea 100644 --- a/moto/logs/utils.py +++ b/moto/logs/utils.py @@ -13,3 +13,47 @@ PAGINATION_MODEL = { "unique_attribute": "arn", }, } + + +class FilterPattern: + def __init__(self, term): + self.term = term + + +class QuotedTermFilterPattern(FilterPattern): + def matches(self, message): + # We still have the quotes around the term - we should remove those in the parser + return self.term[1:-1] in message + + +class SingleTermFilterPattern(FilterPattern): + def matches(self, message): + required_words = self.term.split(" ") + return all([word in message for word in required_words]) + + +class UnsupportedFilterPattern(FilterPattern): + def matches(self, message): # pylint: disable=unused-argument + return True + + +class EventMessageFilter: + def __init__(self, pattern: str): + current_phrase = "" + current_type = None + if pattern: + for char in pattern: + if not current_type: + if char.isalpha(): + current_type = SingleTermFilterPattern + elif char == '"': + current_type = QuotedTermFilterPattern + else: + current_type = UnsupportedFilterPattern + current_phrase += char + else: + current_type = UnsupportedFilterPattern + self.filter_type = current_type(current_phrase) + + def matches(self, message): + return self.filter_type.matches(message) diff --git a/tests/test_logs/test_logs.py b/tests/test_logs/test_logs.py index 54c6317fa..e09607fa5 100644 --- a/tests/test_logs/test_logs.py +++ b/tests/test_logs/test_logs.py @@ -1,8 +1,6 @@ import json -import os import time import sure # noqa # pylint: disable=unused-import -from unittest import SkipTest from datetime import timedelta, datetime from uuid import UUID @@ -424,54 +422,6 @@ def test_put_logs(): events.should.have.length_of(2) -@mock_logs -def test_filter_logs_interleaved(): - conn = boto3.client("logs", TEST_REGION) - log_group_name = "dummy" - log_stream_name = "stream" - conn.create_log_group(logGroupName=log_group_name) - conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) - messages = [ - {"timestamp": 0, "message": "hello"}, - {"timestamp": 0, "message": "world"}, - ] - conn.put_log_events( - logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages - ) - res = conn.filter_log_events( - logGroupName=log_group_name, logStreamNames=[log_stream_name], interleaved=True - ) - events = res["events"] - for original_message, resulting_event in zip(messages, events): - resulting_event["eventId"].should.equal(str(resulting_event["eventId"])) - resulting_event["timestamp"].should.equal(original_message["timestamp"]) - resulting_event["message"].should.equal(original_message["message"]) - - -@mock_logs -def test_filter_logs_raises_if_filter_pattern(): - if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true": - raise SkipTest("Does not work in server mode due to error in Workzeug") - conn = boto3.client("logs", TEST_REGION) - log_group_name = "dummy" - log_stream_name = "stream" - conn.create_log_group(logGroupName=log_group_name) - conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) - messages = [ - {"timestamp": 0, "message": "hello"}, - {"timestamp": 0, "message": "world"}, - ] - conn.put_log_events( - logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages - ) - with pytest.raises(NotImplementedError): - conn.filter_log_events( - logGroupName=log_group_name, - logStreamNames=[log_stream_name], - filterPattern='{$.message = "hello"}', - ) - - @mock_logs def test_put_log_events_in_wrong_order(): conn = boto3.client("logs", "us-east-1") @@ -544,103 +494,6 @@ def test_put_log_events_in_the_future(minutes): ) -@mock_logs -def test_put_log_events_now(): - conn = boto3.client("logs", "us-east-1") - log_group_name = "test" - log_stream_name = "teststream" - conn.create_log_group(logGroupName=log_group_name) - conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) - - ts_1 = int(unix_time_millis()) - ts_2 = int(unix_time_millis(datetime.utcnow() + timedelta(minutes=5))) - ts_3 = int(unix_time_millis(datetime.utcnow() + timedelta(days=1))) - - messages = [ - {"message": f"Message {idx}", "timestamp": ts} - for idx, ts in enumerate([ts_1, ts_2, ts_3]) - ] - - resp = conn.put_log_events( - logGroupName=log_group_name, - logStreamName=log_stream_name, - logEvents=messages, - sequenceToken="49599396607703531511419593985621160512859251095480828066", - ) - - # Message 2 was too new - resp.should.have.key("rejectedLogEventsInfo").should.equal( - {"tooNewLogEventStartIndex": 2} - ) - # Message 0 and 1 were persisted though - events = conn.filter_log_events( - logGroupName=log_group_name, logStreamNames=[log_stream_name], limit=20 - )["events"] - messages = [e["message"] for e in events] - messages.should.contain("Message 0") - messages.should.contain("Message 1") - messages.shouldnt.contain("Message 2") - - -@mock_logs -def test_filter_logs_paging(): - conn = boto3.client("logs", TEST_REGION) - log_group_name = "/aws/dummy" - log_stream_name = "stream/stage" - conn.create_log_group(logGroupName=log_group_name) - conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) - timestamp = int(unix_time_millis(datetime.utcnow())) - messages = [] - for i in range(25): - messages.append( - {"message": "Message number {}".format(i), "timestamp": timestamp} - ) - timestamp += 100 - - conn.put_log_events( - logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages - ) - res = conn.filter_log_events( - logGroupName=log_group_name, logStreamNames=[log_stream_name], limit=20 - ) - events = res["events"] - events.should.have.length_of(20) - res["nextToken"].should.equal("/aws/dummy@stream/stage@" + events[-1]["eventId"]) - - res = conn.filter_log_events( - logGroupName=log_group_name, - logStreamNames=[log_stream_name], - limit=20, - nextToken=res["nextToken"], - ) - events += res["events"] - events.should.have.length_of(25) - res.should_not.have.key("nextToken") - - for original_message, resulting_event in zip(messages, events): - resulting_event["eventId"].should.equal(str(resulting_event["eventId"])) - resulting_event["timestamp"].should.equal(original_message["timestamp"]) - resulting_event["message"].should.equal(original_message["message"]) - - res = conn.filter_log_events( - logGroupName=log_group_name, - logStreamNames=[log_stream_name], - limit=20, - nextToken="invalid-token", - ) - res["events"].should.have.length_of(0) - res.should_not.have.key("nextToken") - - res = conn.filter_log_events( - logGroupName=log_group_name, - logStreamNames=[log_stream_name], - limit=20, - nextToken="wrong-group@stream@999", - ) - res["events"].should.have.length_of(0) - res.should_not.have.key("nextToken") - - @mock_logs def test_put_retention_policy(): conn = boto3.client("logs", TEST_REGION) diff --git a/tests/test_logs/test_logs_filter.py b/tests/test_logs/test_logs_filter.py new file mode 100644 index 000000000..cb7adfb1f --- /dev/null +++ b/tests/test_logs/test_logs_filter.py @@ -0,0 +1,191 @@ +import boto3 +import sure # noqa # pylint: disable=unused-import +from unittest import TestCase +from datetime import timedelta, datetime + +from moto import mock_logs +from moto.core.utils import unix_time_millis + +TEST_REGION = "eu-west-1" + + +class TestLogFilter(TestCase): + def setUp(self) -> None: + self.conn = boto3.client("logs", TEST_REGION) + self.log_group_name = "dummy" + self.log_stream_name = "stream" + self.conn.create_log_group(logGroupName=self.log_group_name) + self.conn.create_log_stream( + logGroupName=self.log_group_name, logStreamName=self.log_stream_name + ) + + +@mock_logs +class TestLogFilterParameters(TestLogFilter): + def setUp(self) -> None: + super().setUp() + + def test_filter_logs_interleaved(self): + messages = [ + {"timestamp": 0, "message": "hello"}, + {"timestamp": 0, "message": "world"}, + ] + self.conn.put_log_events( + logGroupName=self.log_group_name, + logStreamName=self.log_stream_name, + logEvents=messages, + ) + res = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + interleaved=True, + ) + events = res["events"] + for original_message, resulting_event in zip(messages, events): + resulting_event["eventId"].should.equal(str(resulting_event["eventId"])) + resulting_event["timestamp"].should.equal(original_message["timestamp"]) + resulting_event["message"].should.equal(original_message["message"]) + + def test_put_log_events_now(self): + ts_1 = int(unix_time_millis()) + ts_2 = int(unix_time_millis(datetime.utcnow() + timedelta(minutes=5))) + ts_3 = int(unix_time_millis(datetime.utcnow() + timedelta(days=1))) + + messages = [ + {"message": f"Message {idx}", "timestamp": ts} + for idx, ts in enumerate([ts_1, ts_2, ts_3]) + ] + + resp = self.conn.put_log_events( + logGroupName=self.log_group_name, + logStreamName=self.log_stream_name, + logEvents=messages, + sequenceToken="49599396607703531511419593985621160512859251095480828066", + ) + + # Message 2 was too new + resp.should.have.key("rejectedLogEventsInfo").should.equal( + {"tooNewLogEventStartIndex": 2} + ) + # Message 0 and 1 were persisted though + events = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + limit=20, + )["events"] + messages = [e["message"] for e in events] + messages.should.contain("Message 0") + messages.should.contain("Message 1") + messages.shouldnt.contain("Message 2") + + def test_filter_logs_paging(self): + timestamp = int(unix_time_millis(datetime.utcnow())) + messages = [] + for i in range(25): + messages.append( + {"message": "Message number {}".format(i), "timestamp": timestamp} + ) + timestamp += 100 + + self.conn.put_log_events( + logGroupName=self.log_group_name, + logStreamName=self.log_stream_name, + logEvents=messages, + ) + res = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + limit=20, + ) + events = res["events"] + events.should.have.length_of(20) + res["nextToken"].should.equal("dummy@stream@" + events[-1]["eventId"]) + + res = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + limit=20, + nextToken=res["nextToken"], + ) + events += res["events"] + events.should.have.length_of(25) + res.should_not.have.key("nextToken") + + for original_message, resulting_event in zip(messages, events): + resulting_event["eventId"].should.equal(str(resulting_event["eventId"])) + resulting_event["timestamp"].should.equal(original_message["timestamp"]) + resulting_event["message"].should.equal(original_message["message"]) + + res = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + limit=20, + nextToken="wrong-group@stream@999", + ) + res["events"].should.have.length_of(0) + res.should_not.have.key("nextToken") + + def test_filter_logs_paging__unknown_token(self): + res = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + limit=20, + nextToken="invalid-token", + ) + res["events"].should.have.length_of(0) + res.should_not.have.key("nextToken") + + +@mock_logs +class TestLogsFilterPattern(TestLogFilter): + def setUp(self) -> None: + super().setUp() + now = int(unix_time_millis(datetime.utcnow())) + messages = [ + {"timestamp": now, "message": "hello"}, + {"timestamp": now, "message": "world"}, + {"timestamp": now, "message": "hello world"}, + {"timestamp": now, "message": "goodbye world"}, + {"timestamp": now, "message": "hello cruela"}, + {"timestamp": now, "message": "goodbye cruel world"}, + ] + self.conn.put_log_events( + logGroupName=self.log_group_name, + logStreamName=self.log_stream_name, + logEvents=messages, + ) + + def test_unknown_pattern(self): + events = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + filterPattern='{$.message = "hello"}', + )["events"] + events.should.have.length_of(6) + + def test_simple_word_pattern(self): + events = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + filterPattern="hello", + )["events"] + messages = [e["message"] for e in events] + set(messages).should.equal({"hello", "hello cruela", "hello world"}) + + def test_multiple_words_pattern(self): + events = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + filterPattern="goodbye world", + )["events"] + messages = [e["message"] for e in events] + set(messages).should.equal({"goodbye world", "goodbye cruel world"}) + + def test_quoted_pattern(self): + events = self.conn.filter_log_events( + logGroupName=self.log_group_name, + logStreamNames=[self.log_stream_name], + filterPattern='"hello cruel"', + )["events"] + messages = [e["message"] for e in events] + set(messages).should.equal({"hello cruela"})