Allow CloudWatch logs to be sent to Firehose (#4286)
Co-authored-by: Karri Balk <kbalk@users.noreply.github.com>
This commit is contained in:
parent
002f9979ef
commit
69c2a11f5c
@ -3,8 +3,11 @@
|
||||
Incomplete list of unfinished items:
|
||||
- The create_delivery_stream() argument
|
||||
DeliveryStreamEncryptionConfigurationInput is not supported.
|
||||
- The S3BackupMode argument is ignored as are most of the other
|
||||
destination arguments.
|
||||
- Data record size and number of transactions are ignored.
|
||||
- Better validation of delivery destination parameters, e.g.,
|
||||
validation of the url for an http endpoint (boto3 does this),
|
||||
validation of the url for an http endpoint (boto3 does this).
|
||||
- Better handling of the put_record_batch() API. Not only is
|
||||
the existing logic bare bones, but for the ElasticSearch and
|
||||
RedShift destinations, the data is just ignored.
|
||||
@ -12,8 +15,11 @@ Incomplete list of unfinished items:
|
||||
are reported back to the user. Instead an exception is raised.
|
||||
- put_record(), put_record_batch() always set "Encrypted" to False.
|
||||
"""
|
||||
from base64 import b64decode
|
||||
from base64 import b64decode, b64encode
|
||||
from datetime import datetime
|
||||
from gzip import GzipFile
|
||||
import io
|
||||
import json
|
||||
from time import time
|
||||
from uuid import uuid4
|
||||
import warnings
|
||||
@ -32,7 +38,6 @@ from moto.firehose.exceptions import (
|
||||
ResourceNotFoundException,
|
||||
ValidationException,
|
||||
)
|
||||
from moto.core.utils import get_random_hex
|
||||
from moto.s3 import s3_backend
|
||||
from moto.utilities.tagging_service import TaggingService
|
||||
|
||||
@ -163,11 +168,6 @@ class FirehoseBackend(BaseBackend):
|
||||
self.delivery_streams = {}
|
||||
self.tagger = TaggingService()
|
||||
|
||||
def lookup_name_from_arn(self, arn):
|
||||
"""Given an ARN, return the associated delivery stream name."""
|
||||
# TODO - need to test
|
||||
return self.delivery_streams.get(arn.split("/")[-1])
|
||||
|
||||
def reset(self):
|
||||
"""Re-initializes all attributes for this instance."""
|
||||
region_name = self.region_name
|
||||
@ -425,7 +425,7 @@ class FirehoseBackend(BaseBackend):
|
||||
return (
|
||||
f"{prefix}{now.strftime('%Y/%m/%d/%H')}/"
|
||||
f"{delivery_stream_name}-{version_id}-"
|
||||
f"{now.strftime('%Y-%m-%d-%H-%M-%S')}-{get_random_hex()}"
|
||||
f"{now.strftime('%Y-%m-%d-%H-%M-%S')}-{str(uuid4())}"
|
||||
)
|
||||
|
||||
def put_s3_records(self, delivery_stream_name, version_id, s3_destination, records):
|
||||
@ -621,6 +621,41 @@ class FirehoseBackend(BaseBackend):
|
||||
# S3 backup if it is disabled. If backup is enabled, you can't update
|
||||
# the delivery stream to disable it."
|
||||
|
||||
def lookup_name_from_arn(self, arn):
|
||||
"""Given an ARN, return the associated delivery stream name."""
|
||||
return self.delivery_streams.get(arn.split("/")[-1])
|
||||
|
||||
def send_log_event(
|
||||
self,
|
||||
delivery_stream_arn,
|
||||
filter_name,
|
||||
log_group_name,
|
||||
log_stream_name,
|
||||
log_events,
|
||||
): # pylint: disable=too-many-arguments
|
||||
"""Send log events to a S3 bucket after encoding and gzipping it."""
|
||||
data = {
|
||||
"logEvents": log_events,
|
||||
"logGroup": log_group_name,
|
||||
"logStream": log_stream_name,
|
||||
"messageType": "DATA_MESSAGE",
|
||||
"owner": ACCOUNT_ID,
|
||||
"subscriptionFilters": [filter_name],
|
||||
}
|
||||
|
||||
output = io.BytesIO()
|
||||
with GzipFile(fileobj=output, mode="w") as fhandle:
|
||||
fhandle.write(json.dumps(data, separators=(",", ":")).encode("utf-8"))
|
||||
gzipped_payload = b64encode(output.getvalue()).decode("utf-8")
|
||||
|
||||
delivery_stream = self.lookup_name_from_arn(delivery_stream_arn)
|
||||
self.put_s3_records(
|
||||
delivery_stream.delivery_stream_name,
|
||||
delivery_stream.version_id,
|
||||
delivery_stream.destinations[0]["S3"],
|
||||
[{"Data": gzipped_payload}],
|
||||
)
|
||||
|
||||
|
||||
firehose_backends = {}
|
||||
for available_region in Session().get_available_regions("firehose"):
|
||||
|
@ -123,10 +123,10 @@ class LogStream(BaseModel):
|
||||
self.events += events
|
||||
self.upload_sequence_token += 1
|
||||
|
||||
if self.destination_arn and self.destination_arn.split(":")[2] == "lambda":
|
||||
from moto.awslambda import lambda_backends # due to circular dependency
|
||||
|
||||
lambda_log_events = [
|
||||
service = None
|
||||
if self.destination_arn:
|
||||
service = self.destination_arn.split(":")[2]
|
||||
formatted_log_events = [
|
||||
{
|
||||
"id": event.event_id,
|
||||
"timestamp": event.timestamp,
|
||||
@ -135,12 +135,27 @@ class LogStream(BaseModel):
|
||||
for event in events
|
||||
]
|
||||
|
||||
if service == "lambda":
|
||||
from moto.awslambda import lambda_backends # due to circular dependency
|
||||
|
||||
lambda_backends[self.region].send_log_event(
|
||||
self.destination_arn,
|
||||
self.filter_name,
|
||||
log_group_name,
|
||||
log_stream_name,
|
||||
lambda_log_events,
|
||||
formatted_log_events,
|
||||
)
|
||||
elif service == "firehose":
|
||||
from moto.firehose import ( # pylint: disable=import-outside-toplevel
|
||||
firehose_backends,
|
||||
)
|
||||
|
||||
firehose_backends[self.region].send_log_event(
|
||||
self.destination_arn,
|
||||
self.filter_name,
|
||||
log_group_name,
|
||||
log_stream_name,
|
||||
formatted_log_events,
|
||||
)
|
||||
|
||||
return "{:056d}".format(self.upload_sequence_token)
|
||||
@ -849,7 +864,22 @@ class LogsBackend(BaseBackend):
|
||||
"have given CloudWatch Logs permission to execute your "
|
||||
"function."
|
||||
)
|
||||
elif service == "firehose":
|
||||
from moto.firehose import ( # pylint: disable=import-outside-toplevel
|
||||
firehose_backends,
|
||||
)
|
||||
|
||||
firehose = firehose_backends[self.region_name].lookup_name_from_arn(
|
||||
destination_arn
|
||||
)
|
||||
if not firehose:
|
||||
raise InvalidParameterException(
|
||||
"Could not deliver test message to specified Firehose "
|
||||
"stream. Check if the given Firehose stream is in ACTIVE "
|
||||
"state."
|
||||
)
|
||||
else:
|
||||
# TODO: support Kinesis stream destinations
|
||||
raise InvalidParameterException(
|
||||
f"Service '{service}' has not implemented for "
|
||||
f"put_subscription_filter()"
|
||||
|
@ -493,3 +493,30 @@ def test_update_destination():
|
||||
"Changing the destination type to or from HttpEndpoint is not "
|
||||
"supported at this time"
|
||||
) in err["Message"]
|
||||
|
||||
|
||||
@mock_firehose
|
||||
def test_lookup_name_from_arn():
|
||||
"""Test delivery stream instance can be retrieved given ARN.
|
||||
|
||||
This won't work in TEST_SERVER_MODE as this script won't have access
|
||||
to 'firehose_backends'.
|
||||
"""
|
||||
if settings.TEST_SERVER_MODE:
|
||||
raise SkipTest("Can't access firehose_backend in server mode")
|
||||
|
||||
client = boto3.client("firehose", region_name=TEST_REGION)
|
||||
s3_dest_config = sample_s3_dest_config()
|
||||
stream_name = "test_lookup"
|
||||
|
||||
arn = client.create_delivery_stream(
|
||||
DeliveryStreamName=stream_name, S3DestinationConfiguration=s3_dest_config,
|
||||
)["DeliveryStreamARN"]
|
||||
|
||||
from moto.firehose.models import ( # pylint: disable=import-outside-toplevel
|
||||
firehose_backends,
|
||||
)
|
||||
|
||||
delivery_stream = firehose_backends[TEST_REGION].lookup_name_from_arn(arn)
|
||||
assert delivery_stream.delivery_stream_arn == arn
|
||||
assert delivery_stream.delivery_stream_name == stream_name
|
||||
|
@ -1,15 +1,15 @@
|
||||
import base64
|
||||
import boto3
|
||||
from io import BytesIO
|
||||
import json
|
||||
import sure # noqa
|
||||
import time
|
||||
from zipfile import ZipFile, ZIP_DEFLATED
|
||||
import zlib
|
||||
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
from io import BytesIO
|
||||
from moto import mock_logs, mock_lambda, mock_iam
|
||||
from moto import mock_logs, mock_lambda, mock_iam, mock_firehose, mock_s3
|
||||
import pytest
|
||||
from zipfile import ZipFile, ZIP_DEFLATED
|
||||
import sure # noqa
|
||||
|
||||
|
||||
@mock_lambda
|
||||
@ -177,6 +177,95 @@ def test_put_subscription_filter_with_lambda():
|
||||
log_events[1]["timestamp"].should.equal(0)
|
||||
|
||||
|
||||
@mock_s3
|
||||
@mock_firehose
|
||||
@mock_logs
|
||||
@pytest.mark.network
|
||||
def test_put_subscription_filter_with_firehose():
|
||||
# given
|
||||
region_name = "us-east-1"
|
||||
client_firehose = boto3.client("firehose", region_name)
|
||||
client_logs = boto3.client("logs", region_name)
|
||||
|
||||
log_group_name = "/firehose-test"
|
||||
log_stream_name = "delivery-stream"
|
||||
client_logs.create_log_group(logGroupName=log_group_name)
|
||||
client_logs.create_log_stream(
|
||||
logGroupName=log_group_name, logStreamName=log_stream_name
|
||||
)
|
||||
|
||||
# Create a S3 bucket.
|
||||
bucket_name = "firehosetestbucket"
|
||||
s3_client = boto3.client("s3", region_name=region_name)
|
||||
s3_client.create_bucket(
|
||||
Bucket=bucket_name,
|
||||
CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
|
||||
)
|
||||
|
||||
# Create the Firehose delivery stream that uses that S3 bucket as
|
||||
# the destination.
|
||||
delivery_stream_name = "firehose_log_test"
|
||||
firehose_arn = client_firehose.create_delivery_stream(
|
||||
DeliveryStreamName=delivery_stream_name,
|
||||
ExtendedS3DestinationConfiguration={
|
||||
"RoleARN": _get_role_name(region_name),
|
||||
"BucketARN": f"arn:aws:s3::{bucket_name}",
|
||||
},
|
||||
)["DeliveryStreamARN"]
|
||||
|
||||
# when
|
||||
client_logs.put_subscription_filter(
|
||||
logGroupName=log_group_name,
|
||||
filterName="firehose-test",
|
||||
filterPattern="",
|
||||
destinationArn=firehose_arn,
|
||||
)
|
||||
|
||||
# then
|
||||
response = client_logs.describe_subscription_filters(logGroupName=log_group_name)
|
||||
response["subscriptionFilters"].should.have.length_of(1)
|
||||
filter = response["subscriptionFilters"][0]
|
||||
filter["creationTime"].should.be.a(int)
|
||||
filter["destinationArn"] = firehose_arn
|
||||
filter["distribution"] = "ByLogStream"
|
||||
filter["logGroupName"] = "/firehose-test"
|
||||
filter["filterName"] = "firehose-test"
|
||||
filter["filterPattern"] = ""
|
||||
|
||||
# when
|
||||
client_logs.put_log_events(
|
||||
logGroupName=log_group_name,
|
||||
logStreamName=log_stream_name,
|
||||
logEvents=[
|
||||
{"timestamp": 0, "message": "test"},
|
||||
{"timestamp": 0, "message": "test 2"},
|
||||
],
|
||||
)
|
||||
|
||||
# then
|
||||
bucket_objects = s3_client.list_objects_v2(Bucket=bucket_name)
|
||||
message = s3_client.get_object(
|
||||
Bucket=bucket_name, Key=bucket_objects["Contents"][0]["Key"]
|
||||
)
|
||||
response = json.loads(
|
||||
zlib.decompress(message["Body"].read(), 16 + zlib.MAX_WBITS).decode("utf-8")
|
||||
)
|
||||
|
||||
response["messageType"].should.equal("DATA_MESSAGE")
|
||||
response["owner"].should.equal("123456789012")
|
||||
response["logGroup"].should.equal("/firehose-test")
|
||||
response["logStream"].should.equal("delivery-stream")
|
||||
response["subscriptionFilters"].should.equal(["firehose-test"])
|
||||
log_events = sorted(response["logEvents"], key=lambda log_event: log_event["id"])
|
||||
log_events.should.have.length_of(2)
|
||||
log_events[0]["id"].should.be.a(int)
|
||||
log_events[0]["message"].should.equal("test")
|
||||
log_events[0]["timestamp"].should.equal(0)
|
||||
log_events[1]["id"].should.be.a(int)
|
||||
log_events[1]["message"].should.equal("test 2")
|
||||
log_events[1]["timestamp"].should.equal(0)
|
||||
|
||||
|
||||
@mock_lambda
|
||||
@mock_logs
|
||||
def test_delete_subscription_filter():
|
||||
|
Loading…
Reference in New Issue
Block a user