From 3bc18455a2dad70148dc095435a55b2a43eaeac1 Mon Sep 17 00:00:00 2001
From: ljakimczuk <39192420+ljakimczuk@users.noreply.github.com>
Date: Mon, 28 Sep 2020 08:16:06 +0200
Subject: [PATCH] WIP: Introducing VPC Flow Logs (#3337)
* Start working on flow logs
* Change test
* Constructing tests
* Changing exceptions and adding more tests
* Adding more tests
* Changing model and adding more tests
* Adding support for tags
* Mocking Access error with non-existing Log Group Name
* Adding FlowLogAlreadyExists support
* Changing style
* Reformatted code
* Reformatted tests
* Removing needless test
* Adding support for CloudFormation
* Reformatting slightly
* Removing arnparse and using split
* Rearranging tests
* Fixing FilterNotImplementedError test
* Moving imports to 'if' clauses and adding explicit test for 'cloud-watch-logs' type
* Setting names matching boto3 API and restoring 'not-implementd-filter' test
* Reformatting tests with black
---
moto/ec2/exceptions.py | 53 +++
moto/ec2/models.py | 304 ++++++++++++++
moto/ec2/responses/__init__.py | 2 +
moto/ec2/responses/flow_logs.py | 122 ++++++
moto/ec2/utils.py | 5 +
tests/test_ec2/test_flow_logs.py | 678 +++++++++++++++++++++++++++++++
6 files changed, 1164 insertions(+)
create mode 100644 moto/ec2/responses/flow_logs.py
create mode 100644 tests/test_ec2/test_flow_logs.py
diff --git a/moto/ec2/exceptions.py b/moto/ec2/exceptions.py
index 4c47adbb9..b2d7e8aab 100644
--- a/moto/ec2/exceptions.py
+++ b/moto/ec2/exceptions.py
@@ -71,6 +71,24 @@ class InvalidSubnetIdError(EC2ClientError):
)
+class InvalidFlowLogIdError(EC2ClientError):
+ def __init__(self, count, flow_log_ids):
+ super(InvalidFlowLogIdError, self).__init__(
+ "InvalidFlowLogId.NotFound",
+ "These flow log ids in the input list are not found: [TotalCount: {0}] {1}".format(
+ count, flow_log_ids
+ ),
+ )
+
+
+class FlowLogAlreadyExists(EC2ClientError):
+ def __init__(self):
+ super(FlowLogAlreadyExists, self).__init__(
+ "FlowLogAlreadyExists",
+ "Error. There is an existing Flow Log with the same configuration and log destination.",
+ )
+
+
class InvalidNetworkAclIdError(EC2ClientError):
def __init__(self, network_acl_id):
super(InvalidNetworkAclIdError, self).__init__(
@@ -263,6 +281,14 @@ class InvalidAddressError(EC2ClientError):
)
+class LogDestinationNotFoundError(EC2ClientError):
+ def __init__(self, bucket_name):
+ super(LogDestinationNotFoundError, self).__init__(
+ "LogDestinationNotFoundException",
+ "LogDestination: '{0}' does not exist.".format(bucket_name),
+ )
+
+
class InvalidAllocationIdError(EC2ClientError):
def __init__(self, allocation_id):
super(InvalidAllocationIdError, self).__init__(
@@ -309,6 +335,33 @@ class InvalidVPCPeeringConnectionStateTransitionError(EC2ClientError):
)
+class InvalidDependantParameterError(EC2ClientError):
+ def __init__(self, dependant_parameter, parameter, parameter_value):
+ super(InvalidDependantParameterError, self).__init__(
+ "InvalidParameter",
+ "{0} can't be empty if {1} is {2}.".format(
+ dependant_parameter, parameter, parameter_value,
+ ),
+ )
+
+
+class InvalidDependantParameterTypeError(EC2ClientError):
+ def __init__(self, dependant_parameter, parameter_value, parameter):
+ super(InvalidDependantParameterTypeError, self).__init__(
+ "InvalidParameter",
+ "{0} type must be {1} if {2} is provided.".format(
+ dependant_parameter, parameter_value, parameter,
+ ),
+ )
+
+
+class InvalidAggregationIntervalParameterError(EC2ClientError):
+ def __init__(self, parameter):
+ super(InvalidAggregationIntervalParameterError, self).__init__(
+ "InvalidParameter", "Invalid {0}".format(parameter),
+ )
+
+
class InvalidParameterValueError(EC2ClientError):
def __init__(self, parameter_value):
super(InvalidParameterValueError, self).__init__(
diff --git a/moto/ec2/models.py b/moto/ec2/models.py
index d6d92da43..e85dab800 100644
--- a/moto/ec2/models.py
+++ b/moto/ec2/models.py
@@ -28,11 +28,13 @@ from moto.core.utils import (
camelcase_to_underscores,
)
from moto.core import ACCOUNT_ID
+
from .exceptions import (
CidrLimitExceeded,
DependencyViolationError,
EC2ClientError,
FilterNotImplementedError,
+ FlowLogAlreadyExists,
GatewayNotAttachedError,
InvalidAddressError,
InvalidAllocationIdError,
@@ -52,6 +54,10 @@ from .exceptions import (
InvalidKeyPairDuplicateError,
InvalidKeyPairFormatError,
InvalidKeyPairNameError,
+ InvalidAggregationIntervalParameterError,
+ InvalidDependantParameterError,
+ InvalidDependantParameterTypeError,
+ InvalidFlowLogIdError,
InvalidLaunchTemplateNameError,
InvalidNetworkAclIdError,
InvalidNetworkAttachmentIdError,
@@ -123,6 +129,7 @@ from .utils import (
random_spot_request_id,
random_subnet_id,
random_subnet_association_id,
+ random_flow_log_id,
random_volume_id,
random_vpc_id,
random_vpc_cidr_association_id,
@@ -1176,6 +1183,7 @@ class TagBackend(object):
"subnet",
"volume",
"vpc",
+ "vpc-flow-log",
"vpc-peering-connection" "vpn-connection",
"vpn-gateway",
]
@@ -3524,6 +3532,301 @@ class SubnetBackend(object):
raise InvalidParameterValueError(attr_name)
+class Unsuccessful(object):
+ def __init__(
+ self, resource_id, error_code, error_message,
+ ):
+ self.resource_id = resource_id
+ self.error_code = error_code
+ self.error_message = error_message
+
+
+class FlowLogs(TaggedEC2Resource, CloudFormationModel):
+ def __init__(
+ self,
+ ec2_backend,
+ flow_log_id,
+ resource_id,
+ traffic_type,
+ log_destination,
+ log_group_name,
+ deliver_logs_permission_arn,
+ max_aggregation_interval,
+ log_destination_type,
+ log_format,
+ deliver_logs_status="SUCCESS",
+ deliver_logs_error_message=None,
+ ):
+ self.ec2_backend = ec2_backend
+ self.id = flow_log_id
+ self.resource_id = resource_id
+ self.traffic_type = traffic_type
+ self.log_destination = log_destination
+ self.log_group_name = log_group_name
+ self.deliver_logs_permission_arn = deliver_logs_permission_arn
+ self.deliver_logs_status = deliver_logs_status
+ self.deliver_logs_error_message = deliver_logs_error_message
+ self.max_aggregation_interval = max_aggregation_interval
+ self.log_destination_type = log_destination_type
+ self.log_format = log_format
+
+ self.created_at = utc_date_and_time()
+
+ @staticmethod
+ def cloudformation_name_type():
+ return None
+
+ @staticmethod
+ def cloudformation_type():
+ # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-flowlog.html
+ return "AWS::EC2::FlowLog"
+
+ @classmethod
+ def create_from_cloudformation_json(
+ cls, resource_name, cloudformation_json, region_name
+ ):
+ properties = cloudformation_json["Properties"]
+
+ resource_type = properties.get("ResourceType")
+ resource_id = [properties.get("ResourceId")]
+ traffic_type = properties.get("TrafficType")
+ deliver_logs_permission_arn = properties.get("DeliverLogsPermissionArn")
+ log_destination_type = properties.get("LogDestinationType")
+ log_destination = properties.get("LogDestination")
+ log_group_name = properties.get("LogGroupName")
+ log_format = properties.get("LogFormat")
+ max_aggregation_interval = properties.get("MaxAggregationInterval")
+
+ ec2_backend = ec2_backends[region_name]
+ flow_log, _ = ec2_backend.create_flow_logs(
+ resource_type,
+ resource_id,
+ traffic_type,
+ deliver_logs_permission_arn,
+ log_destination_type,
+ log_destination,
+ log_group_name,
+ log_format,
+ max_aggregation_interval,
+ )
+ for tag in properties.get("Tags", []):
+ tag_key = tag["Key"]
+ tag_value = tag["Value"]
+ flow_log[0].add_tag(tag_key, tag_value)
+
+ return flow_log[0]
+
+ @property
+ def physical_resource_id(self):
+ return self.id
+
+ def get_filter_value(self, filter_name):
+ """
+ API Version 2016-11-15 defines the following filters for DescribeFlowLogs:
+
+ * deliver-log-status
+ * log-destination-type
+ * flow-log-id
+ * log-group-name
+ * resource-id
+ * traffic-type
+ * tag:key=value
+ * tag-key
+
+ Taken from: https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeFlowLogs.html
+ """
+ if filter_name == "resource-id":
+ return self.resource_id
+ elif filter_name == "traffic-type":
+ return self.traffic_type
+ elif filter_name == "log-destination-type":
+ return self.log_destination_type
+ elif filter_name == "flow-log-id":
+ return self.id
+ elif filter_name == "log-group-name":
+ return self.log_group_name
+ elif filter_name == "deliver-log-status":
+ return "SUCCESS"
+ else:
+ return super(FlowLogs, self).get_filter_value(
+ filter_name, "DescribeFlowLogs"
+ )
+
+
+class FlowLogsBackend(object):
+ def __init__(self):
+ self.flow_logs = defaultdict(dict)
+ super(FlowLogsBackend, self).__init__()
+
+ def _validate_request(
+ self,
+ log_group_name,
+ log_destination,
+ log_destination_type,
+ max_aggregation_interval,
+ deliver_logs_permission_arn,
+ ):
+ if log_group_name is None and log_destination is None:
+ raise InvalidDependantParameterError(
+ "LogDestination", "LogGroupName", "not provided",
+ )
+
+ if log_destination_type == "s3":
+ if log_group_name is not None:
+ raise InvalidDependantParameterTypeError(
+ "LogDestination", "cloud-watch-logs", "LogGroupName",
+ )
+ elif log_destination_type == "cloud-watch-logs":
+ if deliver_logs_permission_arn is None:
+ raise InvalidDependantParameterError(
+ "DeliverLogsPermissionArn",
+ "LogDestinationType",
+ "cloud-watch-logs",
+ )
+
+ if max_aggregation_interval not in ["60", "600"]:
+ raise InvalidAggregationIntervalParameterError(
+ "Flow Log Max Aggregation Interval"
+ )
+
+ def create_flow_logs(
+ self,
+ resource_type,
+ resource_ids,
+ traffic_type,
+ deliver_logs_permission_arn,
+ log_destination_type,
+ log_destination,
+ log_group_name,
+ log_format,
+ max_aggregation_interval,
+ ):
+ # Guess it's best to put it here due to possible
+ # lack of them in the CloudFormation template
+ max_aggregation_interval = (
+ "600" if max_aggregation_interval is None else max_aggregation_interval
+ )
+ log_destination_type = (
+ "cloud-watch-logs" if log_destination_type is None else log_destination_type
+ )
+ log_format = (
+ "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"
+ if log_format is None
+ else log_format
+ )
+
+ # Validate the requests paremeters
+ self._validate_request(
+ log_group_name,
+ log_destination,
+ log_destination_type,
+ max_aggregation_interval,
+ deliver_logs_permission_arn,
+ )
+
+ flow_logs_set = []
+ unsuccessful = []
+
+ for resource_id in resource_ids:
+ deliver_logs_status = "SUCCESS"
+ deliver_logs_error_message = None
+ flow_log_id = random_flow_log_id()
+ if resource_type == "VPC":
+ # Validate VPCs exist
+ self.get_vpc(resource_id)
+ elif resource_type == "Subnet":
+ # Validate Subnets exist
+ self.get_subnet(resource_id)
+ elif resource_type == "NetworkInterface":
+ # Validate NetworkInterfaces exist
+ self.get_network_interface(resource_id)
+
+ if log_destination_type == "s3":
+ from moto.s3.models import s3_backend
+ from moto.s3.exceptions import MissingBucket
+
+ arn = log_destination.split(":", 5)[5]
+ try:
+ s3_backend.get_bucket(arn)
+ except MissingBucket:
+ unsuccessful.append(
+ # Instead of creating FlowLog report
+ # the unsuccessful status for the
+ # given resource_id
+ Unsuccessful(
+ resource_id,
+ "400",
+ "LogDestination: {0} does not exist.".format(arn),
+ )
+ )
+ continue
+ elif log_destination_type == "cloud-watch-logs":
+ from moto.logs.models import logs_backends
+ from moto.logs.exceptions import ResourceNotFoundException
+
+ # API allows to create a FlowLog with a
+ # non-existing LogGroup. It however later
+ # on reports the FAILED delivery status.
+ try:
+ # Need something easy to check the group exists.
+ # The list_tags_log_group seems to do the trick.
+ logs_backends[self.region_name].list_tags_log_group(log_group_name)
+ except ResourceNotFoundException:
+ deliver_logs_status = "FAILED"
+ deliver_logs_error_message = "Access error"
+
+ all_flow_logs = self.describe_flow_logs()
+ if any(
+ fl.resource_id == resource_id
+ and (
+ fl.log_group_name == log_group_name
+ or fl.log_destination == log_destination
+ )
+ for fl in all_flow_logs
+ ):
+ raise FlowLogAlreadyExists()
+ flow_logs = FlowLogs(
+ self,
+ flow_log_id,
+ resource_id,
+ traffic_type,
+ log_destination,
+ log_group_name,
+ deliver_logs_permission_arn,
+ max_aggregation_interval,
+ log_destination_type,
+ log_format,
+ deliver_logs_status,
+ deliver_logs_error_message,
+ )
+ self.flow_logs[flow_log_id] = flow_logs
+ flow_logs_set.append(flow_logs)
+
+ return flow_logs_set, unsuccessful
+
+ def describe_flow_logs(self, flow_log_ids=None, filters=None):
+ matches = itertools.chain([i for i in self.flow_logs.values()])
+ if flow_log_ids:
+ matches = [flow_log for flow_log in matches if flow_log.id in flow_log_ids]
+ if filters:
+ matches = generic_filter(filters, matches)
+ return matches
+
+ def delete_flow_logs(self, flow_log_ids):
+ non_existing = []
+ for flow_log in flow_log_ids:
+ if flow_log in self.flow_logs:
+ self.flow_logs.pop(flow_log, None)
+ else:
+ non_existing.append(flow_log)
+
+ if non_existing:
+ raise InvalidFlowLogIdError(
+ len(flow_log_ids), " ".join(x for x in flow_log_ids),
+ )
+ return True
+
+
class SubnetRouteTableAssociation(CloudFormationModel):
def __init__(self, route_table_id, subnet_id):
self.route_table_id = route_table_id
@@ -5530,6 +5833,7 @@ class EC2Backend(
VPCBackend,
SubnetBackend,
SubnetRouteTableAssociationBackend,
+ FlowLogsBackend,
NetworkInterfaceBackend,
VPNConnectionBackend,
VPCPeeringConnectionBackend,
diff --git a/moto/ec2/responses/__init__.py b/moto/ec2/responses/__init__.py
index 21cbf8249..893a25e89 100644
--- a/moto/ec2/responses/__init__.py
+++ b/moto/ec2/responses/__init__.py
@@ -24,6 +24,7 @@ from .security_groups import SecurityGroups
from .spot_fleets import SpotFleets
from .spot_instances import SpotInstances
from .subnets import Subnets
+from .flow_logs import FlowLogs
from .tags import TagResponse
from .virtual_private_gateways import VirtualPrivateGateways
from .vm_export import VMExport
@@ -60,6 +61,7 @@ class EC2Response(
SpotFleets,
SpotInstances,
Subnets,
+ FlowLogs,
TagResponse,
VirtualPrivateGateways,
VMExport,
diff --git a/moto/ec2/responses/flow_logs.py b/moto/ec2/responses/flow_logs.py
new file mode 100644
index 000000000..9978f89c2
--- /dev/null
+++ b/moto/ec2/responses/flow_logs.py
@@ -0,0 +1,122 @@
+from __future__ import unicode_literals
+from moto.core.responses import BaseResponse
+from moto.ec2.models import validate_resource_ids
+from moto.ec2.utils import filters_from_querystring
+
+
+class FlowLogs(BaseResponse):
+ def create_flow_logs(self):
+ resource_type = self._get_param("ResourceType")
+ resource_ids = self._get_multi_param("ResourceId")
+ traffic_type = self._get_param("TrafficType")
+ deliver_logs_permission_arn = self._get_param("DeliverLogsPermissionArn")
+ log_destination_type = self._get_param("LogDestinationType")
+ log_destination = self._get_param("LogDestination")
+ log_group_name = self._get_param("LogGroupName")
+ log_format = self._get_param("LogFormat")
+ max_aggregation_interval = self._get_param("MaxAggregationInterval")
+ validate_resource_ids(resource_ids)
+
+ tags = self._parse_tag_specification("TagSpecification")
+ tags = tags.get("vpc-flow-log", {})
+ if self.is_not_dryrun("CreateFlowLogs"):
+ flow_logs, errors = self.ec2_backend.create_flow_logs(
+ resource_type=resource_type,
+ resource_ids=resource_ids,
+ traffic_type=traffic_type,
+ deliver_logs_permission_arn=deliver_logs_permission_arn,
+ log_destination_type=log_destination_type,
+ log_destination=log_destination,
+ log_group_name=log_group_name,
+ log_format=log_format,
+ max_aggregation_interval=max_aggregation_interval,
+ )
+ for fl in flow_logs:
+ fl.add_tags(tags)
+ template = self.response_template(CREATE_FLOW_LOGS_RESPONSE)
+ return template.render(flow_logs=flow_logs, errors=errors)
+
+ def describe_flow_logs(self):
+ flow_log_ids = self._get_multi_param("FlowLogId")
+ filters = filters_from_querystring(self.querystring)
+ flow_logs = self.ec2_backend.describe_flow_logs(flow_log_ids, filters)
+ if self.is_not_dryrun("DescribeFlowLogs"):
+ template = self.response_template(DESCRIBE_FLOW_LOGS_RESPONSE)
+ return template.render(flow_logs=flow_logs)
+
+ def delete_flow_logs(self):
+ flow_log_ids = self._get_multi_param("FlowLogId")
+ self.ec2_backend.delete_flow_logs(flow_log_ids)
+ if self.is_not_dryrun("DeleteFlowLogs"):
+ template = self.response_template(DELETE_FLOW_LOGS_RESPONSE)
+ return template.render()
+
+
+CREATE_FLOW_LOGS_RESPONSE = """
+
+ 2d96dae3-504b-4fc4-bf50-266EXAMPLE
+
+ {% for error in errors %}
+ -
+
+
{{ error.error_code }}
+ {{ error.error_message }}
+
+ {{ error.resource_id }}
+
+ {% endfor %}
+
+
+ {% for flow_log in flow_logs %}
+ - {{ flow_log.id }}
+ {% endfor %}
+
+"""
+
+DELETE_FLOW_LOGS_RESPONSE = """
+
+ c5c4f51f-f4e9-42bc-8700-EXAMPLE
+
+"""
+
+DESCRIBE_FLOW_LOGS_RESPONSE = """
+
+ 3cb46f23-099e-4bf0-891c-EXAMPLE
+
+ {% for flow_log in flow_logs %}
+ -
+ {% if flow_log.log_destination is not none %}
+ {{ flow_log.log_destination }}
+ {% endif %}
+ {{ flow_log.resource_id }}
+ {{ flow_log.log_destination_type }}
+ {{ flow_log.created_at }}
+ {{ flow_log.traffic_type }}
+ {{ flow_log.deliver_logs_status }}
+ {% if flow_log.deliver_logs_error_message is not none %}
+ {{ flow_log.deliver_logs_error_message }}
+ {% endif %}
+ {{ flow_log.log_format }}
+ ACTIVE
+ {{ flow_log.id }}
+ {{ flow_log.max_aggregation_interval }}
+ {% if flow_log.deliver_logs_permission_arn is not none %}
+ {{ flow_log.deliver_logs_permission_arn }}
+ {% endif %}
+ {% if flow_log.log_group_name is not none %}
+ {{ flow_log.log_group_name }}
+ {% endif %}
+ {% if flow_log.get_tags() %}
+
+ {% for tag in flow_log.get_tags() %}
+
-
+ {{ tag.key }}
+ {{ tag.value }}
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% endfor %}
+
+"""
diff --git a/moto/ec2/utils.py b/moto/ec2/utils.py
index 653cd055d..e6763fec1 100644
--- a/moto/ec2/utils.py
+++ b/moto/ec2/utils.py
@@ -16,6 +16,7 @@ from moto.core import ACCOUNT_ID
EC2_RESOURCE_TO_PREFIX = {
"customer-gateway": "cgw",
"dhcp-options": "dopt",
+ "flow-logs": "fl",
"image": "ami",
"instance": "i",
"internet-gateway": "igw",
@@ -74,6 +75,10 @@ def random_security_group_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["security-group"])
+def random_flow_log_id():
+ return random_id(prefix=EC2_RESOURCE_TO_PREFIX["flow-logs"])
+
+
def random_snapshot_id():
return random_id(prefix=EC2_RESOURCE_TO_PREFIX["snapshot"])
diff --git a/tests/test_ec2/test_flow_logs.py b/tests/test_ec2/test_flow_logs.py
new file mode 100644
index 000000000..044e6c31d
--- /dev/null
+++ b/tests/test_ec2/test_flow_logs.py
@@ -0,0 +1,678 @@
+from __future__ import unicode_literals
+
+import tests.backport_assert_raises # noqa
+from nose.tools import assert_raises
+
+import boto3
+
+from botocore.exceptions import ParamValidationError, ClientError
+from botocore.parsers import ResponseParserError
+import json
+import sure # noqa
+import random
+import sys
+
+from moto import (
+ settings,
+ mock_cloudformation,
+ mock_ec2,
+ mock_s3,
+ mock_logs,
+)
+from moto.core import ACCOUNT_ID
+from moto.ec2.exceptions import FilterNotImplementedError
+
+
+@mock_s3
+@mock_ec2
+def test_create_flow_logs_s3():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ vpc = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+
+ bucket = s3.create_bucket(
+ Bucket="test-flow-logs",
+ CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
+ )
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ DryRun=True,
+ )
+ ex.exception.response["Error"]["Code"].should.equal("DryRunOperation")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "An error occurred (DryRunOperation) when calling the CreateFlowLogs operation: Request would have succeeded, but DryRun flag is set"
+ )
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ )["FlowLogIds"]
+ response.should.have.length_of(1)
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(1)
+
+ flow_log = flow_logs[0]
+
+ flow_log["FlowLogId"].should.equal(response[0])
+ flow_log["DeliverLogsStatus"].should.equal("SUCCESS")
+ flow_log["FlowLogStatus"].should.equal("ACTIVE")
+ flow_log["ResourceId"].should.equal(vpc["VpcId"])
+ flow_log["TrafficType"].should.equal("ALL")
+ flow_log["LogDestinationType"].should.equal("s3")
+ flow_log["LogDestination"].should.equal("arn:aws:s3:::" + bucket.name)
+ flow_log["LogFormat"].should.equal(
+ "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"
+ )
+ flow_log["MaxAggregationInterval"].should.equal(600)
+
+
+@mock_logs
+@mock_ec2
+def test_create_flow_logs_cloud_watch():
+ client = boto3.client("ec2", region_name="us-west-1")
+ logs_client = boto3.client("logs", region_name="us-west-1")
+
+ vpc = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+ logs_client.create_log_group(logGroupName="test-group")
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="cloud-watch-logs",
+ LogGroupName="test-group",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
+ DryRun=True,
+ )
+ ex.exception.response["Error"]["Code"].should.equal("DryRunOperation")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "An error occurred (DryRunOperation) when calling the CreateFlowLogs operation: Request would have succeeded, but DryRun flag is set"
+ )
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="cloud-watch-logs",
+ LogGroupName="test-group",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
+ )["FlowLogIds"]
+ response.should.have.length_of(1)
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(1)
+
+ flow_log = flow_logs[0]
+
+ flow_log["FlowLogId"].should.equal(response[0])
+ flow_log["DeliverLogsStatus"].should.equal("SUCCESS")
+ flow_log["FlowLogStatus"].should.equal("ACTIVE")
+ flow_log["ResourceId"].should.equal(vpc["VpcId"])
+ flow_log["TrafficType"].should.equal("ALL")
+ flow_log["LogDestinationType"].should.equal("cloud-watch-logs")
+ flow_log["LogGroupName"].should.equal("test-group")
+ flow_log["DeliverLogsPermissionArn"].should.equal(
+ "arn:aws:iam::" + ACCOUNT_ID + ":role/test-role"
+ )
+ flow_log["LogFormat"].should.equal(
+ "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"
+ )
+ flow_log["MaxAggregationInterval"].should.equal(600)
+
+
+@mock_s3
+@mock_ec2
+def test_create_flow_log_create():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ vpc1 = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+ vpc2 = client.create_vpc(CidrBlock="10.1.0.0/16")["Vpc"]
+
+ bucket = s3.create_bucket(
+ Bucket="test-flow-logs",
+ CreateBucketConfiguration={"LocationConstraint": "us-west-1",},
+ )
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc1["VpcId"], vpc2["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ LogFormat="${version} ${vpc-id} ${subnet-id} ${instance-id} ${interface-id} ${account-id} ${type} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${pkt-srcaddr} ${pkt-dstaddr} ${protocol} ${bytes} ${packets} ${start} ${end} ${action} ${tcp-flags} ${log-status}",
+ )["FlowLogIds"]
+ response.should.have.length_of(2)
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(2)
+
+ flow_logs[0]["LogFormat"].should.equal(
+ "${version} ${vpc-id} ${subnet-id} ${instance-id} ${interface-id} ${account-id} ${type} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${pkt-srcaddr} ${pkt-dstaddr} ${protocol} ${bytes} ${packets} ${start} ${end} ${action} ${tcp-flags} ${log-status}"
+ )
+ flow_logs[1]["LogFormat"].should.equal(
+ "${version} ${vpc-id} ${subnet-id} ${instance-id} ${interface-id} ${account-id} ${type} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${pkt-srcaddr} ${pkt-dstaddr} ${protocol} ${bytes} ${packets} ${start} ${end} ${action} ${tcp-flags} ${log-status}"
+ )
+
+
+@mock_s3
+@mock_ec2
+def test_delete_flow_logs():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ vpc1 = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+ vpc2 = client.create_vpc(CidrBlock="10.1.0.0/16")["Vpc"]
+
+ bucket = s3.create_bucket(
+ Bucket="test-flow-logs",
+ CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
+ )
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc1["VpcId"], vpc2["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ )["FlowLogIds"]
+ response.should.have.length_of(2)
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(2)
+
+ client.delete_flow_logs(FlowLogIds=[response[0]])
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(1)
+ flow_logs[0]["FlowLogId"].should.equal(response[1])
+
+ client.delete_flow_logs(FlowLogIds=[response[1]])
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(0)
+
+
+@mock_s3
+@mock_ec2
+def test_delete_flow_logs_delete_many():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ vpc1 = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+ vpc2 = client.create_vpc(CidrBlock="10.1.0.0/16")["Vpc"]
+
+ bucket = s3.create_bucket(
+ Bucket="test-flow-logs",
+ CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
+ )
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc1["VpcId"], vpc2["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ )["FlowLogIds"]
+ response.should.have.length_of(2)
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(2)
+
+ client.delete_flow_logs(FlowLogIds=response)
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(0)
+
+
+@mock_ec2
+def test_delete_flow_logs_non_existing():
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ with assert_raises(ClientError) as ex:
+ client.delete_flow_logs(FlowLogIds=["fl-1a2b3c4d"])
+ ex.exception.response["Error"]["Code"].should.equal("InvalidFlowLogId.NotFound")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "These flow log ids in the input list are not found: [TotalCount: 1] fl-1a2b3c4d"
+ )
+
+ with assert_raises(ClientError) as ex:
+ client.delete_flow_logs(FlowLogIds=["fl-1a2b3c4d", "fl-2b3c4d5e"])
+ ex.exception.response["Error"]["Code"].should.equal("InvalidFlowLogId.NotFound")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "These flow log ids in the input list are not found: [TotalCount: 2] fl-1a2b3c4d fl-2b3c4d5e"
+ )
+
+
+@mock_ec2
+def test_create_flow_logs_unsuccessful():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ vpc1 = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+ vpc2 = client.create_vpc(CidrBlock="10.1.0.0/16")["Vpc"]
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc1["VpcId"], vpc2["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::non-existing-bucket",
+ )
+ response["FlowLogIds"].should.have.length_of(0)
+ response["Unsuccessful"].should.have.length_of(2)
+
+ error1 = response["Unsuccessful"][0]["Error"]
+ error2 = response["Unsuccessful"][1]["Error"]
+
+ error1["Code"].should.equal("400")
+ error1["Message"].should.equal(
+ "LogDestination: non-existing-bucket does not exist."
+ )
+ error2["Code"].should.equal("400")
+ error2["Message"].should.equal(
+ "LogDestination: non-existing-bucket does not exist."
+ )
+
+
+@mock_s3
+@mock_ec2
+def test_create_flow_logs_invalid_parameters():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ vpc = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+
+ bucket = s3.create_bucket(
+ Bucket="test-flow-logs",
+ CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
+ )
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ MaxAggregationInterval=10,
+ )
+ ex.exception.response["Error"]["Code"].should.equal("InvalidParameter")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "Invalid Flow Log Max Aggregation Interval"
+ )
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ )
+ ex.exception.response["Error"]["Code"].should.equal("InvalidParameter")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "LogDestination can't be empty if LogGroupName is not provided."
+ )
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogGroupName="test",
+ )
+ ex.exception.response["Error"]["Code"].should.equal("InvalidParameter")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "LogDestination type must be cloud-watch-logs if LogGroupName is provided."
+ )
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogGroupName="test",
+ )
+ ex.exception.response["Error"]["Code"].should.equal("InvalidParameter")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "DeliverLogsPermissionArn can't be empty if LogDestinationType is cloud-watch-logs."
+ )
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ )["FlowLogIds"]
+ response.should.have.length_of(1)
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket.name,
+ )
+ ex.exception.response["Error"]["Code"].should.equal("FlowLogAlreadyExists")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "Error. There is an existing Flow Log with the same configuration and log destination."
+ )
+
+ response = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogGroupName="test-group",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
+ )["FlowLogIds"]
+ response.should.have.length_of(1)
+
+ with assert_raises(ClientError) as ex:
+ client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc["VpcId"]],
+ TrafficType="ALL",
+ LogGroupName="test-group",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
+ )
+ ex.exception.response["Error"]["Code"].should.equal("FlowLogAlreadyExists")
+ ex.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
+ ex.exception.response["Error"]["Message"].should.equal(
+ "Error. There is an existing Flow Log with the same configuration and log destination."
+ )
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(2)
+
+
+@mock_s3
+@mock_ec2
+@mock_logs
+def test_describe_flow_logs_filtering():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+ logs_client = boto3.client("logs", region_name="us-west-1")
+
+ vpc1 = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+ vpc2 = client.create_vpc(CidrBlock="10.1.0.0/16")["Vpc"]
+ vpc3 = client.create_vpc(CidrBlock="10.2.0.0/16")["Vpc"]
+
+ subnet1 = client.create_subnet(VpcId=vpc1["VpcId"], CidrBlock="10.0.0.0/18")[
+ "Subnet"
+ ]
+
+ bucket1 = s3.create_bucket(
+ Bucket="test-flow-logs-1",
+ CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
+ )
+
+ logs_client.create_log_group(logGroupName="test-group")
+
+ fl1 = client.create_flow_logs(
+ ResourceType="Subnet",
+ ResourceIds=[subnet1["SubnetId"]],
+ TrafficType="ALL",
+ LogGroupName="test-group",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
+ )["FlowLogIds"][0]
+
+ fl2 = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc2["VpcId"]],
+ TrafficType="Accept",
+ LogDestinationType="s3",
+ LogDestination="arn:aws:s3:::" + bucket1.name,
+ TagSpecifications=[
+ {"ResourceType": "vpc-flow-log", "Tags": [{"Key": "foo", "Value": "bar"}]}
+ ],
+ )["FlowLogIds"][0]
+
+ fl3 = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc3["VpcId"]],
+ TrafficType="Reject",
+ LogGroupName="non-existing-group",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role",
+ )["FlowLogIds"][0]
+
+ all_flow_logs = client.describe_flow_logs()["FlowLogs"]
+ all_flow_logs.should.have.length_of(3)
+
+ fl_by_deliver_status = client.describe_flow_logs(
+ Filters=[{"Name": "deliver-log-status", "Values": ["SUCCESS"]}],
+ )["FlowLogs"]
+ fl_by_deliver_status.should.have.length_of(3)
+
+ fl_by_s3_bucket = client.describe_flow_logs(
+ Filters=[{"Name": "log-destination-type", "Values": ["s3"]}],
+ )["FlowLogs"]
+ fl_by_s3_bucket.should.have.length_of(1)
+ fl_by_s3_bucket[0]["FlowLogId"].should.equal(fl2)
+ fl_by_s3_bucket[0]["ResourceId"].should.equal(vpc2["VpcId"])
+
+ fl_by_cloud_watch = client.describe_flow_logs(
+ Filters=[{"Name": "log-destination-type", "Values": ["cloud-watch-logs"]}],
+ )["FlowLogs"]
+ fl_by_cloud_watch.should.have.length_of(2)
+
+ flow_logs_ids = tuple(map(lambda fl: fl["FlowLogId"], fl_by_cloud_watch))
+ fl1.should.be.within(flow_logs_ids)
+ fl3.should.be.within(flow_logs_ids)
+
+ flow_logs_resource_ids = tuple(map(lambda fl: fl["ResourceId"], fl_by_cloud_watch))
+ subnet1["SubnetId"].should.be.within(flow_logs_resource_ids)
+ vpc3["VpcId"].should.be.within(flow_logs_resource_ids)
+
+ test_fl3 = next(fl for fl in fl_by_cloud_watch if fl["FlowLogId"] == fl3)
+ test_fl3["DeliverLogsStatus"].should.equal("FAILED")
+ test_fl3["DeliverLogsErrorMessage"].should.equal("Access error")
+
+ fl_by_both = client.describe_flow_logs(
+ Filters=[
+ {"Name": "log-destination-type", "Values": ["cloud-watch-logs", "s3"]}
+ ],
+ )["FlowLogs"]
+ fl_by_both.should.have.length_of(3)
+
+ fl_by_flow_log_ids = client.describe_flow_logs(
+ Filters=[{"Name": "flow-log-id", "Values": [fl1, fl3]}],
+ )["FlowLogs"]
+ fl_by_flow_log_ids.should.have.length_of(2)
+ flow_logs_ids = tuple(map(lambda fl: fl["FlowLogId"], fl_by_flow_log_ids))
+ fl1.should.be.within(flow_logs_ids)
+ fl3.should.be.within(flow_logs_ids)
+
+ flow_logs_resource_ids = tuple(map(lambda fl: fl["ResourceId"], fl_by_flow_log_ids))
+ subnet1["SubnetId"].should.be.within(flow_logs_resource_ids)
+ vpc3["VpcId"].should.be.within(flow_logs_resource_ids)
+
+ fl_by_group_name = client.describe_flow_logs(
+ Filters=[{"Name": "log-group-name", "Values": ["test-group"]}],
+ )["FlowLogs"]
+ fl_by_group_name.should.have.length_of(1)
+ fl_by_group_name[0]["FlowLogId"].should.equal(fl1)
+ fl_by_group_name[0]["ResourceId"].should.equal(subnet1["SubnetId"])
+
+ fl_by_group_name = client.describe_flow_logs(
+ Filters=[{"Name": "log-group-name", "Values": ["non-existing-group"]}],
+ )["FlowLogs"]
+ fl_by_group_name.should.have.length_of(1)
+ fl_by_group_name[0]["FlowLogId"].should.equal(fl3)
+ fl_by_group_name[0]["ResourceId"].should.equal(vpc3["VpcId"])
+
+ fl_by_resource_id = client.describe_flow_logs(
+ Filters=[{"Name": "resource-id", "Values": [vpc2["VpcId"]]}],
+ )["FlowLogs"]
+ fl_by_resource_id.should.have.length_of(1)
+ fl_by_resource_id[0]["FlowLogId"].should.equal(fl2)
+ fl_by_resource_id[0]["ResourceId"].should.equal(vpc2["VpcId"])
+
+ fl_by_traffic_type = client.describe_flow_logs(
+ Filters=[{"Name": "traffic-type", "Values": ["ALL"]}],
+ )["FlowLogs"]
+ fl_by_traffic_type.should.have.length_of(1)
+ fl_by_traffic_type[0]["FlowLogId"].should.equal(fl1)
+ fl_by_traffic_type[0]["ResourceId"].should.equal(subnet1["SubnetId"])
+
+ fl_by_traffic_type = client.describe_flow_logs(
+ Filters=[{"Name": "traffic-type", "Values": ["Reject"]}],
+ )["FlowLogs"]
+ fl_by_traffic_type.should.have.length_of(1)
+ fl_by_traffic_type[0]["FlowLogId"].should.equal(fl3)
+ fl_by_traffic_type[0]["ResourceId"].should.equal(vpc3["VpcId"])
+
+ fl_by_traffic_type = client.describe_flow_logs(
+ Filters=[{"Name": "traffic-type", "Values": ["Accept"]}],
+ )["FlowLogs"]
+ fl_by_traffic_type.should.have.length_of(1)
+ fl_by_traffic_type[0]["FlowLogId"].should.equal(fl2)
+ fl_by_traffic_type[0]["ResourceId"].should.equal(vpc2["VpcId"])
+
+ fl_by_tag_key = client.describe_flow_logs(
+ Filters=[{"Name": "tag-key", "Values": ["foo"]}],
+ )["FlowLogs"]
+ fl_by_tag_key.should.have.length_of(1)
+ fl_by_tag_key[0]["FlowLogId"].should.equal(fl2)
+ fl_by_tag_key[0]["ResourceId"].should.equal(vpc2["VpcId"])
+
+ fl_by_tag_key = client.describe_flow_logs(
+ Filters=[{"Name": "tag-key", "Values": ["non-existing"]}],
+ )["FlowLogs"]
+ fl_by_tag_key.should.have.length_of(0)
+
+ if not settings.TEST_SERVER_MODE:
+ client.describe_flow_logs.when.called_with(
+ Filters=[{"Name": "not-implemented-filter", "Values": ["foobar"]}],
+ ).should.throw(FilterNotImplementedError)
+ else:
+ client.describe_flow_logs.when.called_with(
+ Filters=[{"Name": "not-implemented-filter", "Values": ["foobar"]}],
+ ).should.throw(ResponseParserError)
+
+
+@mock_s3
+@mock_ec2
+def test_flow_logs_by_ids():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+
+ vpc1 = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+ vpc2 = client.create_vpc(CidrBlock="10.1.0.0/16")["Vpc"]
+ vpc3 = client.create_vpc(CidrBlock="10.2.0.0/16")["Vpc"]
+
+ fl1 = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc1["VpcId"]],
+ TrafficType="Reject",
+ LogGroupName="test-group-1",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role-1",
+ )["FlowLogIds"][0]
+
+ fl2 = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc2["VpcId"]],
+ TrafficType="Reject",
+ LogGroupName="test-group-3",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role-3",
+ )["FlowLogIds"][0]
+
+ fl3 = client.create_flow_logs(
+ ResourceType="VPC",
+ ResourceIds=[vpc3["VpcId"]],
+ TrafficType="Reject",
+ LogGroupName="test-group-3",
+ DeliverLogsPermissionArn="arn:aws:iam::" + ACCOUNT_ID + ":role/test-role-3",
+ )["FlowLogIds"][0]
+
+ flow_logs = client.describe_flow_logs(FlowLogIds=[fl1, fl3])["FlowLogs"]
+ flow_logs.should.have.length_of(2)
+ flow_logs_ids = tuple(map(lambda fl: fl["FlowLogId"], flow_logs))
+ fl1.should.be.within(flow_logs_ids)
+ fl3.should.be.within(flow_logs_ids)
+
+ flow_logs_resource_ids = tuple(map(lambda fl: fl["ResourceId"], flow_logs))
+ vpc1["VpcId"].should.be.within(flow_logs_resource_ids)
+ vpc3["VpcId"].should.be.within(flow_logs_resource_ids)
+
+ client.delete_flow_logs(FlowLogIds=[fl1, fl3])
+
+ flow_logs = client.describe_flow_logs(FlowLogIds=[fl1, fl3])["FlowLogs"]
+ flow_logs.should.have.length_of(0)
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(1)
+ flow_logs[0]["FlowLogId"].should.equal(fl2)
+ flow_logs[0]["ResourceId"].should.equal(vpc2["VpcId"])
+
+ flow_logs = client.delete_flow_logs(FlowLogIds=[fl2])
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(0)
+
+
+@mock_cloudformation
+@mock_ec2
+@mock_s3
+def test_flow_logs_by_cloudformation():
+ s3 = boto3.resource("s3", region_name="us-west-1")
+ client = boto3.client("ec2", region_name="us-west-1")
+ cf_client = boto3.client("cloudformation", "us-west-1")
+
+ vpc = client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
+
+ bucket = s3.create_bucket(
+ Bucket="test-flow-logs",
+ CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
+ )
+
+ flow_log_template = {
+ "AWSTemplateFormatVersion": "2010-09-09",
+ "Description": "Template for VPC Flow Logs creation.",
+ "Resources": {
+ "TestFlowLogs": {
+ "Type": "AWS::EC2::FlowLog",
+ "Properties": {
+ "ResourceType": "VPC",
+ "ResourceId": vpc["VpcId"],
+ "TrafficType": "ALL",
+ "LogDestinationType": "s3",
+ "LogDestination": "arn:aws:s3:::" + bucket.name,
+ "MaxAggregationInterval": "60",
+ "Tags": [{"Key": "foo", "Value": "bar"}],
+ },
+ }
+ },
+ }
+ flow_log_template_json = json.dumps(flow_log_template)
+ stack_id = cf_client.create_stack(
+ StackName="test_stack", TemplateBody=flow_log_template_json
+ )["StackId"]
+
+ flow_logs = client.describe_flow_logs()["FlowLogs"]
+ flow_logs.should.have.length_of(1)
+ flow_logs[0]["ResourceId"].should.equal(vpc["VpcId"])
+ flow_logs[0]["LogDestination"].should.equal("arn:aws:s3:::" + bucket.name)
+ flow_logs[0]["MaxAggregationInterval"].should.equal(60)