SES: Improve template parsing (#5853)

This commit is contained in:
Bert Blommers 2023-01-19 18:27:39 -01:00 committed by GitHub
parent ba78420314
commit 66507dd898
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 168 additions and 87 deletions

View File

@ -1,70 +1,6 @@
from enum import Enum
from moto.dynamodb.exceptions import MockValidationException
class KeyConditionExpressionTokenizer:
"""
Tokenizer for a KeyConditionExpression. Should be used as an iterator.
The final character to be returned will be an empty string, to notify the caller that we've reached the end.
"""
def __init__(self, expression):
self.expression = expression
self.token_pos = 0
def __iter__(self):
self.token_pos = 0
return self
def is_eof(self):
return self.peek() == ""
def peek(self):
"""
Peek the next character without changing the position
"""
try:
return self.expression[self.token_pos]
except IndexError:
return ""
def __next__(self):
"""
Returns the next character, or an empty string if we've reached the end of the string.
Calling this method again will result in a StopIterator
"""
try:
result = self.expression[self.token_pos]
self.token_pos += 1
return result
except IndexError:
if self.token_pos == len(self.expression):
self.token_pos += 1
return ""
raise StopIteration
def skip_characters(self, phrase, case_sensitive=False) -> None:
"""
Skip the characters in the supplied phrase.
If any other character is encountered instead, this will fail.
If we've already reached the end of the iterator, this will fail.
"""
for ch in phrase:
if case_sensitive:
assert self.expression[self.token_pos] == ch
else:
assert self.expression[self.token_pos] in [ch.lower(), ch.upper()]
self.token_pos += 1
def skip_white_space(self):
"""
Skip the any whitespace characters that are coming up
"""
try:
while self.peek() == " ":
self.token_pos += 1
except IndexError:
pass
from moto.utilities.tokenizer import GenericTokenizer
class EXPRESSION_STAGES(Enum):
@ -100,7 +36,7 @@ def parse_expression(
key_name = comparison = None
key_values = []
results = []
tokenizer = KeyConditionExpressionTokenizer(key_condition_expression)
tokenizer = GenericTokenizer(key_condition_expression)
for crnt_char in tokenizer:
if crnt_char == " ":
if current_stage == EXPRESSION_STAGES.INITIAL_STAGE:

View File

@ -1,4 +1,3 @@
import re
import json
import email
import datetime
@ -23,9 +22,9 @@ from .exceptions import (
RuleSetNameAlreadyExists,
RuleSetDoesNotExist,
RuleAlreadyExists,
MissingRenderingAttributeException,
ConfigurationSetAlreadyExists,
)
from .template import parse_template
from .utils import get_random_message_id, is_valid_address
from .feedback import COMMON_MAIL, BOUNCE, COMPLAINT, DELIVERY
@ -106,17 +105,6 @@ class SESQuota(BaseModel):
return self.sent
def are_all_variables_present(template, template_data):
subject_part = template["subject_part"]
text_part = template["text_part"]
html_part = template["html_part"]
for var in re.findall("{{(.+?)}}", subject_part + text_part + html_part):
if not template_data.get(var):
return var, False
return None, True
class SESBackend(BaseBackend):
"""
Responsible for mocking calls to SES.
@ -469,18 +457,13 @@ class SESBackend(BaseBackend):
"Template rendering data is invalid"
)
var, are_variables_present = are_all_variables_present(template, template_data)
if not are_variables_present:
raise MissingRenderingAttributeException(var)
subject_part = template["subject_part"]
text_part = template["text_part"]
html_part = template["html_part"]
for key, value in template_data.items():
subject_part = str.replace(str(subject_part), "{{" + key + "}}", value)
text_part = str.replace(str(text_part), "{{" + key + "}}", value)
html_part = str.replace(str(html_part), "{{" + key + "}}", value)
subject_part = parse_template(str(subject_part), template_data)
text_part = parse_template(str(text_part), template_data)
html_part = parse_template(str(html_part), template_data)
email_obj = MIMEMultipart("alternative")

45
moto/ses/template.py Normal file
View File

@ -0,0 +1,45 @@
from .exceptions import MissingRenderingAttributeException
from moto.utilities.tokenizer import GenericTokenizer
def parse_template(template, template_data, tokenizer=None, until=None):
tokenizer = tokenizer or GenericTokenizer(template)
state = ""
parsed = ""
var_name = ""
for char in tokenizer:
if until is not None and (char + tokenizer.peek(len(until) - 1)) == until:
return parsed
if char == "{":
tokenizer.skip_characters("{")
tokenizer.skip_white_space()
if tokenizer.peek() == "#":
state = "LOOP"
tokenizer.skip_characters("#each")
tokenizer.skip_white_space()
else:
state = "VAR"
continue
if char == "}":
if state in ["LOOP", "VAR"]:
tokenizer.skip_characters("}")
if state == "VAR":
if template_data.get(var_name) is None:
raise MissingRenderingAttributeException(var_name)
parsed += template_data.get(var_name)
else:
current_position = tokenizer.token_pos
for item in template_data.get(var_name, []):
tokenizer.token_pos = current_position
parsed += parse_template(
template, item, tokenizer, until="{{/each}}"
)
tokenizer.skip_characters("{/each}}")
var_name = ""
state = ""
continue
if state in ["LOOP", "VAR"]:
var_name += char
continue
parsed += char
return parsed

View File

@ -0,0 +1,62 @@
class GenericTokenizer:
"""
Tokenizer for a KeyConditionExpression. Should be used as an iterator.
The final character to be returned will be an empty string, to notify the caller that we've reached the end.
"""
def __init__(self, expression):
self.expression = expression
self.token_pos = 0
def __iter__(self):
return self
def is_eof(self):
return self.peek() == ""
def peek(self, length=1):
"""
Peek the next character without changing the position
"""
try:
return self.expression[self.token_pos : self.token_pos + length]
except IndexError:
return ""
def __next__(self):
"""
Returns the next character, or an empty string if we've reached the end of the string.
Calling this method again will result in a StopIterator
"""
try:
result = self.expression[self.token_pos]
self.token_pos += 1
return result
except IndexError:
if self.token_pos == len(self.expression):
self.token_pos += 1
return ""
raise StopIteration
def skip_characters(self, phrase, case_sensitive=False) -> None:
"""
Skip the characters in the supplied phrase.
If any other character is encountered instead, this will fail.
If we've already reached the end of the iterator, this will fail.
"""
for ch in phrase:
if case_sensitive:
assert self.expression[self.token_pos] == ch
else:
assert self.expression[self.token_pos] in [ch.lower(), ch.upper()]
self.token_pos += 1
def skip_white_space(self):
"""
Skip any whitespace characters that are coming up
"""
try:
while self.peek() == " ":
self.token_pos += 1
except IndexError:
pass

View File

@ -1268,6 +1268,35 @@ def test_render_template():
)
@mock_ses
def test_render_template__with_foreach():
conn = boto3.client("ses", region_name="us-east-1")
kwargs = dict(
TemplateName="MTT",
TemplateData=json.dumps(
{
"items": [
{"type": "dog", "name": "bobby"},
{"type": "cat", "name": "pedro"},
]
}
),
)
conn.create_template(
Template={
"TemplateName": "MTT",
"SubjectPart": "..",
"TextPart": "..",
"HtmlPart": "{{#each items}} {{name}} is a {{type}}, {{/each}}",
}
)
result = conn.test_render_template(**kwargs)
result["RenderedTemplate"].should.contain("bobby is a dog")
result["RenderedTemplate"].should.contain("pedro is a cat")
@mock_ses
def test_update_ses_template():
conn = boto3.client("ses", region_name="us-east-1")

View File

@ -0,0 +1,26 @@
from moto.ses.template import parse_template
import sure # noqa # pylint: disable=unused-import
def test_template_without_args():
parse_template("some template", template_data={}).should.equal("some template")
def test_template_with_simple_arg():
t = "Dear {{name}},"
parse_template(t, template_data={"name": "John"}).should.equal("Dear John,")
def test_template_with_foreach():
t = "List:{{#each l}} - {{obj}}{{/each}} and other things"
kwargs = {"l": [{"obj": "it1"}, {"obj": "it2"}]}
parse_template(t, kwargs).should.equal("List: - it1 - it2 and other things")
def test_template_with_multiple_foreach():
t = "{{#each l}} - {{obj}}{{/each}} and list 2 {{#each l2}} {{o1}} {{o2}} {{/each}}"
kwargs = {
"l": [{"obj": "it1"}, {"obj": "it2"}],
"l2": [{"o1": "ob1", "o2": "ob2"}, {"o1": "oc1", "o2": "oc2"}],
}
parse_template(t, kwargs).should.equal(" - it1 - it2 and list 2 ob1 ob2 oc1 oc2 ")