from moto.ec2.models import validate_resource_ids
from ._base_response import EC2BaseResponse
class FlowLogs(EC2BaseResponse):
def create_flow_logs(self) -> str:
resource_type = self._get_param("ResourceType")
resource_ids = self._get_multi_param("ResourceId")
traffic_type = self._get_param("TrafficType")
deliver_logs_permission_arn = self._get_param("DeliverLogsPermissionArn")
log_destination_type = self._get_param("LogDestinationType")
log_destination = self._get_param("LogDestination")
log_group_name = self._get_param("LogGroupName")
log_format = self._get_param("LogFormat")
max_aggregation_interval = self._get_param("MaxAggregationInterval")
validate_resource_ids(resource_ids)
tags = self._parse_tag_specification().get("vpc-flow-log", {})
self.error_on_dryrun()
flow_logs, errors = self.ec2_backend.create_flow_logs(
resource_type=resource_type,
resource_ids=resource_ids,
traffic_type=traffic_type,
deliver_logs_permission_arn=deliver_logs_permission_arn,
log_destination_type=log_destination_type,
log_destination=log_destination,
log_group_name=log_group_name,
log_format=log_format,
max_aggregation_interval=max_aggregation_interval,
)
for fl in flow_logs:
fl.add_tags(tags)
template = self.response_template(CREATE_FLOW_LOGS_RESPONSE)
return template.render(flow_logs=flow_logs, errors=errors)
def describe_flow_logs(self) -> str:
flow_log_ids = self._get_multi_param("FlowLogId")
filters = self._filters_from_querystring()
flow_logs = self.ec2_backend.describe_flow_logs(flow_log_ids, filters)
self.error_on_dryrun()
template = self.response_template(DESCRIBE_FLOW_LOGS_RESPONSE)
return template.render(flow_logs=flow_logs)
def delete_flow_logs(self) -> str:
flow_log_ids = self._get_multi_param("FlowLogId")
self.error_on_dryrun()
self.ec2_backend.delete_flow_logs(flow_log_ids)
return self.response_template(DELETE_FLOW_LOGS_RESPONSE).render()
CREATE_FLOW_LOGS_RESPONSE = """
2d96dae3-504b-4fc4-bf50-266EXAMPLE
{% for error in errors %}
-
{{ error.1 }}
{{ error.2 }}
{{ error.0 }}
{% endfor %}
{% for flow_log in flow_logs %}
- {{ flow_log.id }}
{% endfor %}
"""
DELETE_FLOW_LOGS_RESPONSE = """
c5c4f51f-f4e9-42bc-8700-EXAMPLE
"""
DESCRIBE_FLOW_LOGS_RESPONSE = """
3cb46f23-099e-4bf0-891c-EXAMPLE
{% for flow_log in flow_logs %}
-
{% if flow_log.log_destination is not none %}
{{ flow_log.log_destination }}
{% endif %}
{{ flow_log.resource_id }}
{{ flow_log.log_destination_type }}
{{ flow_log.created_at }}
{{ flow_log.traffic_type }}
{{ flow_log.deliver_logs_status }}
{% if flow_log.deliver_logs_error_message is not none %}
{{ flow_log.deliver_logs_error_message }}
{% endif %}
{{ flow_log.log_format }}
ACTIVE
{{ flow_log.id }}
{{ flow_log.max_aggregation_interval }}
{% if flow_log.deliver_logs_permission_arn is not none %}
{{ flow_log.deliver_logs_permission_arn }}
{% endif %}
{% if flow_log.log_group_name is not none %}
{{ flow_log.log_group_name }}
{% endif %}
{% if flow_log.get_tags() %}
{% for tag in flow_log.get_tags() %}
-
{{ tag.key }}
{{ tag.value }}
{% endfor %}
{% endif %}
{% endfor %}
"""