moto/moto/wafv2/models.py
2022-11-10 08:43:20 -01:00

187 lines
6.2 KiB
Python

import datetime
import re
from typing import Dict
from moto.core import BaseBackend, BackendDict, BaseModel
from .utils import make_arn_for_wacl
from .exceptions import WAFV2DuplicateItemException, WAFNonexistentItemException
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
from collections import OrderedDict
US_EAST_1_REGION = "us-east-1"
GLOBAL_REGION = "global"
APIGATEWAY_REGEX = (
r"arn:aws:apigateway:[a-zA-Z0-9-]+::/restapis/[a-zA-Z0-9]+/stages/[a-zA-Z0-9]+"
)
# TODO: Add remaining properties
class FakeWebACL(BaseModel):
"""
https://docs.aws.amazon.com/waf/latest/APIReference/API_WebACL.html
"""
def __init__(
self, name, arn, wacl_id, visibility_config, default_action, description, rules
):
self.name = name
self.created_time = iso_8601_datetime_with_milliseconds(datetime.datetime.now())
self.id = wacl_id
self.arn = arn
self.description = description or ""
self.capacity = 3
self.rules = rules
self.visibility_config = visibility_config
self.default_action = default_action
self.lock_token = str(mock_random.uuid4())[0:6]
def update(self, default_action, rules, description, visibility_config):
if default_action is not None:
self.default_action = default_action
if rules is not None:
self.rules = rules
if description is not None:
self.description = description
if visibility_config is not None:
self.visibility_config = visibility_config
self.lock_token = str(mock_random.uuid4())[0:6]
def to_dict(self):
# Format for summary https://docs.aws.amazon.com/waf/latest/APIReference/API_CreateWebACL.html (response syntax section)
return {
"ARN": self.arn,
"Description": self.description,
"Id": self.id,
"Name": self.name,
"Rules": self.rules,
"DefaultAction": self.default_action,
"VisibilityConfig": self.visibility_config,
}
class WAFV2Backend(BaseBackend):
"""
https://docs.aws.amazon.com/waf/latest/APIReference/API_Operations_AWS_WAFV2.html
"""
def __init__(self, region_name, account_id):
super().__init__(region_name, account_id)
self.wacls: Dict[str, FakeWebACL] = OrderedDict()
self.tagging_service = TaggingService()
# TODO: self.load_balancers = OrderedDict()
def associate_web_acl(self, web_acl_arn, resource_arn):
"""
Only APIGateway Stages can be associated at the moment.
"""
if web_acl_arn not in self.wacls:
raise WAFNonexistentItemException
stage = self._find_apigw_stage(resource_arn)
if stage:
stage.web_acl_arn = web_acl_arn
def disassociate_web_acl(self, resource_arn):
stage = self._find_apigw_stage(resource_arn)
if stage:
stage.web_acl_arn = None
def get_web_acl_for_resource(self, resource_arn):
stage = self._find_apigw_stage(resource_arn)
if stage and stage.web_acl_arn is not None:
wacl_arn = stage.web_acl_arn
return self.wacls.get(wacl_arn)
return None
def _find_apigw_stage(self, resource_arn):
try:
if re.search(APIGATEWAY_REGEX, resource_arn):
region = resource_arn.split(":")[3]
rest_api_id = resource_arn.split("/")[-3]
stage_name = resource_arn.split("/")[-1]
from moto.apigateway import apigateway_backends
apigw = apigateway_backends[self.account_id][region]
return apigw.get_stage(rest_api_id, stage_name)
except: # noqa: E722 Do not use bare except
return None
def create_web_acl(
self, name, visibility_config, default_action, scope, description, tags, rules
):
"""
The following parameters are not yet implemented: CustomResponseBodies, CaptchaConfig
"""
wacl_id = str(mock_random.uuid4())
arn = make_arn_for_wacl(
name=name,
account_id=self.account_id,
region_name=self.region_name,
wacl_id=wacl_id,
scope=scope,
)
if arn in self.wacls or self._is_duplicate_name(name):
raise WAFV2DuplicateItemException()
new_wacl = FakeWebACL(
name, arn, wacl_id, visibility_config, default_action, description, rules
)
self.wacls[arn] = new_wacl
self.tag_resource(arn, tags)
return new_wacl
def delete_web_acl(self, name, _id):
"""
The LockToken-parameter is not yet implemented
"""
self.wacls = {
arn: wacl
for arn, wacl in self.wacls.items()
if wacl.name != name and wacl.id != _id
}
def get_web_acl(self, name, _id) -> FakeWebACL:
for wacl in self.wacls.values():
if wacl.name == name and wacl.id == _id:
return wacl
raise WAFNonexistentItemException
def list_web_acls(self):
return [wacl.to_dict() for wacl in self.wacls.values()]
def _is_duplicate_name(self, name):
allWaclNames = set(wacl.name for wacl in self.wacls.values())
return name in allWaclNames
def list_rule_groups(self):
return []
def list_tags_for_resource(self, arn):
"""
Pagination is not yet implemented
"""
return self.tagging_service.list_tags_for_resource(arn)["Tags"]
def tag_resource(self, arn, tags):
self.tagging_service.tag_resource(arn, tags)
def untag_resource(self, arn, tag_keys):
self.tagging_service.untag_resource_using_names(arn, tag_keys)
def update_web_acl(
self, name, _id, default_action, rules, description, visibility_config
):
"""
The following parameters are not yet implemented: LockToken, CustomResponseBodies, CaptchaConfig
"""
acl = self.get_web_acl(name, _id)
acl.update(default_action, rules, description, visibility_config)
return acl.lock_token
wafv2_backends = BackendDict(
WAFV2Backend, "waf-regional", additional_regions=["global"]
)