SES: Support templates with if/else (#6867)

This commit is contained in:
Bert Blommers 2023-09-30 07:35:11 +00:00 committed by GitHub
parent 504387dcb8
commit 6fe4999de2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 279 additions and 50 deletions

View File

@ -42,4 +42,4 @@ jobs:
env:
MOTO_TEST_ALLOW_AWS_REQUEST: ${{ true }}
run: |
pytest -sv tests/test_ec2/ tests/test_s3 -m aws_verified
pytest -sv tests/test_ec2/ tests/test_ses/ tests/test_s3 -m aws_verified

View File

@ -518,6 +518,9 @@ class SESBackend(BaseBackend):
)
return rendered_template
def delete_template(self, name: str) -> None:
self.templates.pop(name)
def create_receipt_rule_set(self, rule_set_name: str) -> None:
if self.receipt_rule_set.get(rule_set_name) is not None:
raise RuleSetNameAlreadyExists("Duplicate Receipt Rule Set Name.")

View File

@ -281,6 +281,11 @@ class EmailResponse(BaseResponse):
template = self.response_template(RENDER_TEMPLATE)
return template.render(template=rendered_template)
def delete_template(self) -> str:
name = self._get_param("TemplateName")
self.backend.delete_template(name)
return self.response_template(DELETE_TEMPLATE).render()
def create_receipt_rule_set(self) -> str:
rule_set_name = self._get_param("RuleSetName")
self.backend.create_receipt_rule_set(rule_set_name)
@ -627,6 +632,14 @@ RENDER_TEMPLATE = """
</TestRenderTemplateResponse>
"""
DELETE_TEMPLATE = """<DeleteTemplateResponse xmlns="http://ses.amazonaws.com/doc/2010-12-01/">
<DeleteTemplateResult>
</DeleteTemplateResult>
<ResponseMetadata>
<RequestId>47e0ef1a-9bf2-11e1-9279-0100e8cf12ba</RequestId>
</ResponseMetadata>
</DeleteTemplateResponse>"""
CREATE_RECEIPT_RULE_SET = """<CreateReceiptRuleSetResponse xmlns="http://ses.amazonaws.com/doc/2010-12-01/">
<CreateReceiptRuleSetResult/>
<ResponseMetadata>

View File

@ -1,53 +1,180 @@
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Type
from .exceptions import MissingRenderingAttributeException
from moto.utilities.tokenizer import GenericTokenizer
class BlockProcessor:
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
self.template = template
self.template_data = template_data
self.tokenizer = tokenizer
def parse(self) -> str:
# Added to make MyPy happy
# Not all implementations have this method
# It's up to the caller to know whether to call this method
raise NotImplementedError
class EachBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
self.template = template
self.tokenizer = tokenizer
self.tokenizer.skip_characters("#each")
self.tokenizer.skip_white_space()
var_name = self.tokenizer.read_until("}}").strip()
self.tokenizer.skip_characters("}}")
self.template_data = template_data.get(var_name, [])
def parse(self) -> str:
parsed = ""
current_pos = self.tokenizer.token_pos
for template_data in self.template_data:
self.tokenizer.token_pos = current_pos
for char in self.tokenizer:
if char == "{" and self.tokenizer.peek() == "{":
self.tokenizer.skip_characters("{")
self.tokenizer.skip_white_space()
_processor = get_processor(self.tokenizer)(
self.template, template_data, self.tokenizer # type: ignore
)
# If we've reached the end, we should stop processing
# Our parent will continue with whatever comes after {{/each}}
if type(_processor) == EachEndBlockProcessor:
break
# If we've encountered another processor, they can continue
parsed += _processor.parse()
continue
parsed += char
return parsed
class EachEndBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)
self.tokenizer.skip_characters("/each")
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")
class IfBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)
self.tokenizer.skip_characters("#if")
self.tokenizer.skip_white_space()
condition = self.tokenizer.read_until("}}").strip()
self.tokenizer.skip_characters("}}")
self.parse_contents = template_data.get(condition)
def parse(self) -> str:
parsed = ""
for char in self.tokenizer:
if char == "{" and self.tokenizer.peek() == "{":
self.tokenizer.skip_characters("{")
self.tokenizer.skip_white_space()
_processor = get_processor(self.tokenizer)(
self.template, self.template_data, self.tokenizer
)
if type(_processor) == IfEndBlockProcessor:
break
elif type(_processor) == ElseBlockProcessor:
self.parse_contents = not self.parse_contents
continue
if self.parse_contents:
parsed += _processor.parse()
continue
if self.parse_contents:
parsed += char
return parsed
class IfEndBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)
self.tokenizer.skip_characters("/if")
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")
class ElseBlockProcessor(BlockProcessor):
def __init__(
self, template: str, template_data: Dict[str, Any], tokenizer: GenericTokenizer
):
super().__init__(template, template_data, tokenizer)
self.tokenizer.skip_characters("else")
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")
class VarBlockProcessor(BlockProcessor):
def parse(self) -> str:
var_name = self.tokenizer.read_until("}}").strip()
if self.template_data.get(var_name) is None:
raise MissingRenderingAttributeException(var_name)
data: str = self.template_data.get(var_name) # type: ignore
self.tokenizer.skip_white_space()
self.tokenizer.skip_characters("}}")
return data
def get_processor(tokenizer: GenericTokenizer) -> Type[BlockProcessor]:
if tokenizer.peek(5) == "#each":
return EachBlockProcessor
if tokenizer.peek(5) == "/each":
return EachEndBlockProcessor
if tokenizer.peek(3) == "#if":
return IfBlockProcessor
if tokenizer.peek(3) == "/if":
return IfEndBlockProcessor
if tokenizer.peek(4) == "else":
return ElseBlockProcessor
return VarBlockProcessor
def parse_template(
template: str,
template_data: Dict[str, Any],
tokenizer: Optional[GenericTokenizer] = None,
until: Optional[str] = None,
) -> str:
tokenizer = tokenizer or GenericTokenizer(template)
state = ""
parsed = ""
var_name = ""
for char in tokenizer:
if until is not None and (char + tokenizer.peek(len(until) - 1)) == until:
return parsed
if char == "{" and tokenizer.peek() == "{":
# Two braces next to each other indicate a variable/language construct such as for-each
# We have different processors handling different constructs
tokenizer.skip_characters("{")
tokenizer.skip_white_space()
if tokenizer.peek() == "#":
state = "LOOP"
tokenizer.skip_characters("#each")
tokenizer.skip_white_space()
else:
state = "VAR"
continue
if char == "}":
if state in ["LOOP", "VAR"]:
tokenizer.skip_characters("}")
if state == "VAR":
if template_data.get(var_name) is None:
raise MissingRenderingAttributeException(var_name)
parsed += template_data.get(var_name) # type: ignore
else:
current_position = tokenizer.token_pos
for item in template_data.get(var_name, []):
tokenizer.token_pos = current_position
parsed += parse_template(
template, item, tokenizer, until="{{/each}}"
)
tokenizer.skip_characters("{/each}}")
var_name = ""
state = ""
continue
if state in ["LOOP", "VAR"]:
var_name += char
_processor = get_processor(tokenizer)(template, template_data, tokenizer)
parsed += _processor.parse()
continue
parsed += char
return parsed

View File

@ -38,6 +38,14 @@ class GenericTokenizer:
return ""
raise StopIteration
def read_until(self, phrase: str) -> str:
chars_read = ""
for char in self:
chars_read += char
if self.peek(len(phrase)) == phrase:
return chars_read
return chars_read
def skip_characters(self, phrase: str, case_sensitive: bool = False) -> None:
"""
Skip the characters in the supplied phrase.

View File

@ -1 +1,28 @@
# This file is intentionally left blank.
import os
from functools import wraps
from moto import mock_ses
def ses_aws_verified(func):
"""
Function that is verified to work against AWS.
Can be run against AWS at any time by setting:
MOTO_TEST_ALLOW_AWS_REQUEST=true
If this environment variable is not set, the function runs in a `mock_ses` context.
"""
@wraps(func)
def pagination_wrapper():
allow_aws_request = (
os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
)
if allow_aws_request:
resp = func()
else:
with mock_ses():
resp = func()
return resp
return pagination_wrapper

View File

@ -7,6 +7,7 @@ from botocore.exceptions import ClientError
from botocore.exceptions import ParamValidationError
import pytest
from moto import mock_ses
from . import ses_aws_verified
@mock_ses
@ -1296,8 +1297,8 @@ def test_render_template():
)
@mock_ses
def test_render_template__with_foreach():
@ses_aws_verified
def test_render_template__advanced():
conn = boto3.client("ses", region_name="us-east-1")
kwargs = {
@ -1305,24 +1306,27 @@ def test_render_template__with_foreach():
"TemplateData": json.dumps(
{
"items": [
{"type": "dog", "name": "bobby"},
{"type": "cat", "name": "pedro"},
{"type": "dog", "name": "bobby", "best": True},
{"type": "cat", "name": "pedro", "best": False},
]
}
),
}
conn.create_template(
Template={
"TemplateName": "MTT",
"SubjectPart": "..",
"TextPart": "..",
"HtmlPart": "{{#each items}} {{name}} is a {{type}}, {{/each}}",
}
)
result = conn.test_render_template(**kwargs)
assert "bobby is a dog" in result["RenderedTemplate"]
assert "pedro is a cat" in result["RenderedTemplate"]
try:
conn.create_template(
Template={
"TemplateName": "MTT",
"SubjectPart": "..",
"TextPart": "..",
"HtmlPart": "{{#each items}} {{name}} is {{#if best}}the best{{else}}a {{type}}{{/if}}, {{/each}}",
}
)
result = conn.test_render_template(**kwargs)
assert "bobby is the best" in result["RenderedTemplate"]
assert "pedro is a cat" in result["RenderedTemplate"]
finally:
conn.delete_template(TemplateName="MTT")
@mock_ses

View File

@ -34,3 +34,50 @@ def test_template_with_single_curly_brace():
assert parse_template(template, template_data={"name": "John"}) == (
"Dear John my {brace} is fine."
)
def test_template_with_if():
template = "Dear {{ #if name }}John{{ /if }}"
assert parse_template(template, template_data={"name": True}) == "Dear John"
assert parse_template(template, template_data={"name": False}) == "Dear "
def test_template_with_if_else():
template = "Dear {{ #if name }}John{{ else }}English{{ /if }}"
assert parse_template(template, template_data={"name": True}) == "Dear John"
assert parse_template(template, template_data={"name": False}) == "Dear English"
def test_template_with_nested_foreaches():
template = (
"{{#each l}} - {{obj}} {{#each l2}} {{o1}} {{/each}} infix-each {{/each}}"
)
kwargs = {
"name": True,
"l": [
{
"obj": "it1",
"l2": [{"o1": "ob1", "o2": "ob2"}, {"o1": "oc1", "o2": "oc2"}],
},
{"obj": "it2", "l2": [{"o1": "ob3"}]},
],
}
assert (
parse_template(template, kwargs)
== " - it1 ob1 oc1 infix-each - it2 ob3 infix-each "
)
def test_template_with_nested_ifs_and_foreaches():
template = "{{#each l}} - {{obj}} {{#if name}} post-if {{#each l2}} {{o1}} {{/each}} pre-if-end {{else}}elsedata{{/if}} post-if {{/each}}"
kwargs = {
"name": True,
"l": [
{"obj": "it1", "name": True, "l2": [{"o1": "ob1"}]},
{"obj": "it2", "name": False},
],
}
assert (
parse_template(template, kwargs)
== " - it1 post-if ob1 pre-if-end post-if - it2 elsedata post-if "
)