bugfix: apply cloudwatch subscription filters to newly created streams (#4962)
This commit is contained in:
parent
75ecc31ec4
commit
4f92ee8a1c
@ -297,9 +297,13 @@ class LogGroup(CloudFormationModel):
|
||||
def create_log_stream(self, log_stream_name):
|
||||
if log_stream_name in self.streams:
|
||||
raise ResourceAlreadyExistsException()
|
||||
self.streams[log_stream_name] = LogStream(
|
||||
self.region, self.name, log_stream_name
|
||||
)
|
||||
stream = LogStream(self.region, self.name, log_stream_name)
|
||||
filters = self.describe_subscription_filters()
|
||||
|
||||
if filters:
|
||||
stream.destination_arn = filters[0]["destinationArn"]
|
||||
stream.filter_name = filters[0]["filterName"]
|
||||
self.streams[log_stream_name] = stream
|
||||
|
||||
def delete_log_stream(self, log_stream_name):
|
||||
if log_stream_name not in self.streams:
|
||||
|
@ -181,6 +181,77 @@ def test_put_subscription_filter_with_lambda():
|
||||
log_events[1]["timestamp"].should.equal(ts_1)
|
||||
|
||||
|
||||
@mock_lambda
|
||||
@mock_logs
|
||||
@pytest.mark.network
|
||||
def test_subscription_filter_applies_to_new_streams():
|
||||
# given
|
||||
region_name = "us-east-1"
|
||||
client_lambda = boto3.client("lambda", region_name)
|
||||
client_logs = boto3.client("logs", region_name)
|
||||
log_group_name = "/test"
|
||||
log_stream_name = "stream"
|
||||
client_logs.create_log_group(logGroupName=log_group_name)
|
||||
function_arn = client_lambda.create_function(
|
||||
FunctionName="test",
|
||||
Runtime="python3.8",
|
||||
Role=_get_role_name(region_name),
|
||||
Handler="lambda_function.lambda_handler",
|
||||
Code={"ZipFile": _get_test_zip_file()},
|
||||
Description="test lambda function",
|
||||
Timeout=3,
|
||||
MemorySize=128,
|
||||
Publish=True,
|
||||
)["FunctionArn"]
|
||||
|
||||
# when
|
||||
client_logs.put_subscription_filter(
|
||||
logGroupName=log_group_name,
|
||||
filterName="test",
|
||||
filterPattern="",
|
||||
destinationArn=function_arn,
|
||||
)
|
||||
client_logs.create_log_stream( # create log stream after subscription filter applied
|
||||
logGroupName=log_group_name, logStreamName=log_stream_name
|
||||
)
|
||||
ts_0 = int(unix_time_millis(datetime.utcnow()))
|
||||
ts_1 = int(unix_time_millis(datetime.utcnow())) + 10
|
||||
client_logs.put_log_events(
|
||||
logGroupName=log_group_name,
|
||||
logStreamName=log_stream_name,
|
||||
logEvents=[
|
||||
{"timestamp": ts_0, "message": "test"},
|
||||
{"timestamp": ts_1, "message": "test 2"},
|
||||
],
|
||||
)
|
||||
|
||||
# then
|
||||
msg_showed_up, received_message = _wait_for_log_msg(
|
||||
client_logs, "/aws/lambda/test", "awslogs"
|
||||
)
|
||||
assert msg_showed_up, "CloudWatch log event was not found. All logs: {}".format(
|
||||
received_message
|
||||
)
|
||||
|
||||
data = json.loads(received_message)["awslogs"]["data"]
|
||||
response = json.loads(
|
||||
zlib.decompress(base64.b64decode(data), 16 + zlib.MAX_WBITS).decode("utf-8")
|
||||
)
|
||||
response["messageType"].should.equal("DATA_MESSAGE")
|
||||
response["owner"].should.equal("123456789012")
|
||||
response["logGroup"].should.equal("/test")
|
||||
response["logStream"].should.equal("stream")
|
||||
response["subscriptionFilters"].should.equal(["test"])
|
||||
log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"])
|
||||
log_events.should.have.length_of(2)
|
||||
log_events[0]["id"].should.be.a(int)
|
||||
log_events[0]["message"].should.equal("test")
|
||||
log_events[0]["timestamp"].should.equal(ts_0)
|
||||
log_events[1]["id"].should.be.a(int)
|
||||
log_events[1]["message"].should.equal("test 2")
|
||||
log_events[1]["timestamp"].should.equal(ts_1)
|
||||
|
||||
|
||||
@mock_s3
|
||||
@mock_firehose
|
||||
@mock_logs
|
||||
|
Loading…
Reference in New Issue
Block a user