CloudwatchLogs - implement validation for old/new messages (#4565)

This commit is contained in:
Bert Blommers 2021-11-12 16:22:47 -01:00 committed by GitHub
parent 756f772c5c
commit 16685d3407
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 200 additions and 42 deletions

View File

@ -26,6 +26,7 @@ from moto.ec2.exceptions import InvalidSubnetIdError
from moto.ec2.models import INSTANCE_TYPES as EC2_INSTANCE_TYPES
from moto.iam.exceptions import IAMNotFoundException
from moto.core import ACCOUNT_ID as DEFAULT_ACCOUNT_ID
from moto.core.utils import unix_time_millis
from moto.utilities.docker_utilities import DockerModel, parse_image_ref
from ..utilities.tagging_service import TaggingService
@ -582,11 +583,8 @@ class Job(threading.Thread, BaseModel, DockerModel):
logs = []
for line in logs_stdout + logs_stderr:
date, line = line.split(" ", 1)
date = dateutil.parser.parse(date)
# TODO: Replace with int(date.timestamp()) once we yeet Python2 out of the window
date = int(
(time.mktime(date.timetuple()) + date.microsecond / 1000000.0)
)
date_obj = dateutil.parser.parse(date, ignoretz=True)
date = unix_time_millis(date_obj)
logs.append({"timestamp": date, "message": line.strip()})
# Send to cloudwatch

View File

@ -15,7 +15,11 @@ from boto3 import Session
from collections import OrderedDict
from moto.core.exceptions import JsonRESTError
from moto.core import ACCOUNT_ID, BaseBackend, CloudFormationModel, BaseModel
from moto.core.utils import unix_time, iso_8601_datetime_without_milliseconds
from moto.core.utils import (
unix_time,
unix_time_millis,
iso_8601_datetime_without_milliseconds,
)
from moto.events.exceptions import (
ValidationException,
ResourceNotFoundException,
@ -176,7 +180,7 @@ class Rule(CloudFormationModel):
log_stream_name = str(uuid4())
log_events = [
{
"timestamp": unix_time(datetime.utcnow()),
"timestamp": unix_time_millis(datetime.utcnow()),
"message": json.dumps(event_copy),
}
]

View File

@ -1,6 +1,7 @@
import uuid
from boto3 import Session
from datetime import datetime, timedelta
from moto import core as moto_core
from moto.core import BaseBackend, BaseModel
@ -742,9 +743,30 @@ class LogsBackend(BaseBackend):
if log_group_name not in self.groups:
raise ResourceNotFoundException()
log_group = self.groups[log_group_name]
return log_group.put_log_events(
log_group_name, log_stream_name, log_events, sequence_token
# Only events from the last 14 days or 2 hours in the future are accepted
rejected_info = {}
allowed_events = []
last_timestamp = None
oldest = int(unix_time_millis(datetime.utcnow() - timedelta(days=14)))
newest = int(unix_time_millis(datetime.utcnow() + timedelta(hours=2)))
for idx, event in enumerate(log_events):
if last_timestamp and last_timestamp > event["timestamp"]:
raise InvalidParameterException(
"Log events in a single PutLogEvents request must be in chronological order."
)
if event["timestamp"] < oldest:
rejected_info["tooOldLogEventEndIndex"] = idx
elif event["timestamp"] > newest:
rejected_info["tooNewLogEventStartIndex"] = idx
else:
allowed_events.append(event)
last_timestamp = event["timestamp"]
token = log_group.put_log_events(
log_group_name, log_stream_name, allowed_events, sequence_token
)
return token, rejected_info
def get_log_events(
self,

View File

@ -214,9 +214,17 @@ class LogsResponse(BaseResponse):
log_events = self._get_param("logEvents")
sequence_token = self._get_param("sequenceToken")
next_sequence_token = self.logs_backend.put_log_events(
next_sequence_token, rejected_info = self.logs_backend.put_log_events(
log_group_name, log_stream_name, log_events, sequence_token
)
if rejected_info:
return json.dumps(
{
"nextSequenceToken": next_sequence_token,
"rejectedLogEventsInfo": rejected_info,
}
)
else:
return json.dumps({"nextSequenceToken": next_sequence_token})
def get_log_events(self):

View File

@ -36,7 +36,7 @@ def test_send_to_cw_log_group():
)
# when
event_time = datetime(2021, 1, 1, 12, 23, 34)
event_time = datetime.utcnow()
client_events.put_events(
Entries=[
{

View File

@ -8,7 +8,9 @@ import zlib
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
from moto import mock_logs, mock_lambda, mock_iam, mock_firehose, mock_s3
from moto.core.utils import unix_time_millis
import pytest
@ -141,12 +143,14 @@ def test_put_subscription_filter_with_lambda():
sub_filter["filterPattern"] = ""
# when
ts_0 = int(unix_time_millis(datetime.utcnow()))
ts_1 = int(unix_time_millis(datetime.utcnow())) + 10
client_logs.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{"timestamp": 0, "message": "test"},
{"timestamp": 0, "message": "test 2"},
{"timestamp": ts_0, "message": "test"},
{"timestamp": ts_1, "message": "test 2"},
],
)
@ -171,10 +175,10 @@ def test_put_subscription_filter_with_lambda():
log_events.should.have.length_of(2)
log_events[0]["id"].should.be.a(int)
log_events[0]["message"].should.equal("test")
log_events[0]["timestamp"].should.equal(0)
log_events[0]["timestamp"].should.equal(ts_0)
log_events[1]["id"].should.be.a(int)
log_events[1]["message"].should.equal("test 2")
log_events[1]["timestamp"].should.equal(0)
log_events[1]["timestamp"].should.equal(ts_1)
@mock_s3
@ -233,12 +237,14 @@ def test_put_subscription_filter_with_firehose():
_filter["filterPattern"] = ""
# when
ts_0 = int(unix_time_millis(datetime.utcnow()))
ts_1 = int(unix_time_millis(datetime.utcnow()))
client_logs.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{"timestamp": 0, "message": "test"},
{"timestamp": 0, "message": "test 2"},
{"timestamp": ts_0, "message": "test"},
{"timestamp": ts_1, "message": "test 2"},
],
)
@ -260,10 +266,10 @@ def test_put_subscription_filter_with_firehose():
log_events.should.have.length_of(2)
log_events[0]["id"].should.be.a(int)
log_events[0]["message"].should.equal("test")
log_events[0]["timestamp"].should.equal(0)
log_events[0]["timestamp"].should.equal(ts_0)
log_events[1]["id"].should.be.a(int)
log_events[1]["message"].should.equal("test 2")
log_events[1]["timestamp"].should.equal(0)
log_events[1]["timestamp"].should.equal(ts_1)
@mock_lambda

View File

@ -3,7 +3,7 @@ import os
import time
import sure # noqa # pylint: disable=unused-import
from unittest import SkipTest
from datetime import timedelta
from datetime import timedelta, datetime
import boto3
import pytest
@ -401,8 +401,8 @@ def test_put_logs():
conn.create_log_group(logGroupName=log_group_name)
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
messages = [
{"timestamp": 0, "message": "hello"},
{"timestamp": 0, "message": "world"},
{"timestamp": int(unix_time_millis()), "message": "hello"},
{"timestamp": int(unix_time_millis()), "message": "world"},
]
put_results = conn.put_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages
@ -465,6 +465,116 @@ def test_filter_logs_raises_if_filter_pattern():
)
@mock_logs
def test_put_log_events_in_wrong_order():
conn = boto3.client("logs", "us-east-1")
log_group_name = "test"
log_stream_name = "teststream"
conn.create_log_group(logGroupName=log_group_name)
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
ts_1 = int(unix_time_millis(datetime.utcnow() - timedelta(days=2)))
ts_2 = int(unix_time_millis(datetime.utcnow() - timedelta(days=5)))
messages = [
{"message": f"Message {idx}", "timestamp": ts}
for idx, ts in enumerate([ts_1, ts_2])
]
with pytest.raises(ClientError) as exc:
conn.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=messages,
sequenceToken="49599396607703531511419593985621160512859251095480828066",
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterException")
err["Message"].should.equal(
"Log events in a single PutLogEvents request must be in chronological order."
)
@mock_logs
@pytest.mark.parametrize("days_ago", [15, 400])
def test_put_log_events_in_the_past(days_ago):
conn = boto3.client("logs", "us-east-1")
log_group_name = "test"
log_stream_name = "teststream"
conn.create_log_group(logGroupName=log_group_name)
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
timestamp = int(unix_time_millis(datetime.utcnow() - timedelta(days=days_ago)))
messages = [{"message": "Message number {}", "timestamp": timestamp}]
resp = conn.put_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages
)
resp.should.have.key("rejectedLogEventsInfo").should.equal(
{"tooOldLogEventEndIndex": 0}
)
@mock_logs
@pytest.mark.parametrize("minutes", [181, 300, 999999])
def test_put_log_events_in_the_future(minutes):
conn = boto3.client("logs", "us-east-1")
log_group_name = "test"
log_stream_name = "teststream"
conn.create_log_group(logGroupName=log_group_name)
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
timestamp = int(unix_time_millis(datetime.utcnow() + timedelta(minutes=minutes)))
messages = [{"message": "Message number {}", "timestamp": timestamp}]
resp = conn.put_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=messages
)
resp.should.have.key("rejectedLogEventsInfo").should.equal(
{"tooNewLogEventStartIndex": 0}
)
@mock_logs
def test_put_log_events_now():
conn = boto3.client("logs", "us-east-1")
log_group_name = "test"
log_stream_name = "teststream"
conn.create_log_group(logGroupName=log_group_name)
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
ts_1 = int(unix_time_millis())
ts_2 = int(unix_time_millis(datetime.utcnow() + timedelta(minutes=5)))
ts_3 = int(unix_time_millis(datetime.utcnow() + timedelta(days=1)))
messages = [
{"message": f"Message {idx}", "timestamp": ts}
for idx, ts in enumerate([ts_1, ts_2, ts_3])
]
resp = conn.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=messages,
sequenceToken="49599396607703531511419593985621160512859251095480828066",
)
# Message 2 was too new
resp.should.have.key("rejectedLogEventsInfo").should.equal(
{"tooNewLogEventStartIndex": 2}
)
# Message 0 and 1 were persisted though
events = conn.filter_log_events(
logGroupName=log_group_name, logStreamNames=[log_stream_name], limit=20
)["events"]
messages = [e["message"] for e in events]
messages.should.contain("Message 0")
messages.should.contain("Message 1")
messages.shouldnt.contain("Message 2")
@mock_logs
def test_filter_logs_paging():
conn = boto3.client("logs", TEST_REGION)
@ -472,7 +582,7 @@ def test_filter_logs_paging():
log_stream_name = "stream/stage"
conn.create_log_group(logGroupName=log_group_name)
conn.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
timestamp = int(time.time())
timestamp = int(unix_time_millis(datetime.utcnow()))
messages = []
for i in range(25):
messages.append(
@ -700,7 +810,11 @@ def test_get_log_events():
client.create_log_group(logGroupName=log_group_name)
client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
events = [{"timestamp": x, "message": str(x)} for x in range(20)]
data = [
(int(unix_time_millis(datetime.utcnow() + timedelta(milliseconds=x))), str(x))
for x in range(20)
]
events = [{"timestamp": x, "message": y} for x, y in data]
client.put_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=events
@ -711,9 +825,9 @@ def test_get_log_events():
)
resp["events"].should.have.length_of(10)
for i in range(10):
resp["events"][i]["timestamp"].should.equal(i + 10)
resp["events"][i]["message"].should.equal(str(i + 10))
for idx, (x, y) in enumerate(data[10:]):
resp["events"][idx]["timestamp"].should.equal(x)
resp["events"][idx]["message"].should.equal(y)
resp["nextForwardToken"].should.equal(
"f/00000000000000000000000000000000000000000000000000000019"
)
@ -729,9 +843,9 @@ def test_get_log_events():
)
resp["events"].should.have.length_of(10)
for i in range(10):
resp["events"][i]["timestamp"].should.equal(i)
resp["events"][i]["message"].should.equal(str(i))
for idx, (x, y) in enumerate(data[0:10]):
resp["events"][idx]["timestamp"].should.equal(x)
resp["events"][idx]["message"].should.equal(y)
resp["nextForwardToken"].should.equal(
"f/00000000000000000000000000000000000000000000000000000009"
)
@ -762,8 +876,9 @@ def test_get_log_events():
)
resp["events"].should.have.length_of(1)
resp["events"][0]["timestamp"].should.equal(1)
resp["events"][0]["message"].should.equal(str(1))
x, y = data[1]
resp["events"][0]["timestamp"].should.equal(x)
resp["events"][0]["message"].should.equal(y)
resp["nextForwardToken"].should.equal(
"f/00000000000000000000000000000000000000000000000000000001"
)
@ -780,7 +895,11 @@ def test_get_log_events_with_start_from_head():
client.create_log_group(logGroupName=log_group_name)
client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
events = [{"timestamp": x, "message": str(x)} for x in range(20)]
data = [
(int(unix_time_millis(datetime.utcnow() + timedelta(milliseconds=x))), str(x))
for x in range(20)
]
events = [{"timestamp": x, "message": y} for x, y in data]
client.put_log_events(
logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=events
@ -794,9 +913,9 @@ def test_get_log_events_with_start_from_head():
)
resp["events"].should.have.length_of(10)
for i in range(10):
resp["events"][i]["timestamp"].should.equal(i)
resp["events"][i]["message"].should.equal(str(i))
for idx, (x, y) in enumerate(data[0:10]):
resp["events"][idx]["timestamp"].should.equal(x)
resp["events"][idx]["message"].should.equal(y)
resp["nextForwardToken"].should.equal(
"f/00000000000000000000000000000000000000000000000000000009"
)
@ -812,9 +931,9 @@ def test_get_log_events_with_start_from_head():
)
resp["events"].should.have.length_of(10)
for i in range(10):
resp["events"][i]["timestamp"].should.equal(i + 10)
resp["events"][i]["message"].should.equal(str(i + 10))
for idx, (x, y) in enumerate(data[10:]):
resp["events"][idx]["timestamp"].should.equal(x)
resp["events"][idx]["message"].should.equal(y)
resp["nextForwardToken"].should.equal(
"f/00000000000000000000000000000000000000000000000000000019"
)
@ -845,8 +964,9 @@ def test_get_log_events_with_start_from_head():
)
resp["events"].should.have.length_of(1)
resp["events"][0]["timestamp"].should.equal(18)
resp["events"][0]["message"].should.equal(str(18))
x, y = data[18]
resp["events"][0]["timestamp"].should.equal(x)
resp["events"][0]["message"].should.equal(y)
resp["nextForwardToken"].should.equal(
"f/00000000000000000000000000000000000000000000000000000018"
)