From 66507dd89888e0a7b27262dd6aad78f680c85a7c Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Thu, 19 Jan 2023 18:27:39 -0100 Subject: [PATCH] SES: Improve template parsing (#5853) --- .../parsing/key_condition_expression.py | 68 +------------------ moto/ses/models.py | 25 ++----- moto/ses/template.py | 45 ++++++++++++ moto/utilities/tokenizer.py | 62 +++++++++++++++++ tests/test_ses/test_ses_boto3.py | 29 ++++++++ tests/test_ses/test_templating.py | 26 +++++++ 6 files changed, 168 insertions(+), 87 deletions(-) create mode 100644 moto/ses/template.py create mode 100644 moto/utilities/tokenizer.py create mode 100644 tests/test_ses/test_templating.py diff --git a/moto/dynamodb/parsing/key_condition_expression.py b/moto/dynamodb/parsing/key_condition_expression.py index cb9953450..3bdbaa7e1 100644 --- a/moto/dynamodb/parsing/key_condition_expression.py +++ b/moto/dynamodb/parsing/key_condition_expression.py @@ -1,70 +1,6 @@ from enum import Enum from moto.dynamodb.exceptions import MockValidationException - - -class KeyConditionExpressionTokenizer: - """ - Tokenizer for a KeyConditionExpression. Should be used as an iterator. - The final character to be returned will be an empty string, to notify the caller that we've reached the end. - """ - - def __init__(self, expression): - self.expression = expression - self.token_pos = 0 - - def __iter__(self): - self.token_pos = 0 - return self - - def is_eof(self): - return self.peek() == "" - - def peek(self): - """ - Peek the next character without changing the position - """ - try: - return self.expression[self.token_pos] - except IndexError: - return "" - - def __next__(self): - """ - Returns the next character, or an empty string if we've reached the end of the string. - Calling this method again will result in a StopIterator - """ - try: - result = self.expression[self.token_pos] - self.token_pos += 1 - return result - except IndexError: - if self.token_pos == len(self.expression): - self.token_pos += 1 - return "" - raise StopIteration - - def skip_characters(self, phrase, case_sensitive=False) -> None: - """ - Skip the characters in the supplied phrase. - If any other character is encountered instead, this will fail. - If we've already reached the end of the iterator, this will fail. - """ - for ch in phrase: - if case_sensitive: - assert self.expression[self.token_pos] == ch - else: - assert self.expression[self.token_pos] in [ch.lower(), ch.upper()] - self.token_pos += 1 - - def skip_white_space(self): - """ - Skip the any whitespace characters that are coming up - """ - try: - while self.peek() == " ": - self.token_pos += 1 - except IndexError: - pass +from moto.utilities.tokenizer import GenericTokenizer class EXPRESSION_STAGES(Enum): @@ -100,7 +36,7 @@ def parse_expression( key_name = comparison = None key_values = [] results = [] - tokenizer = KeyConditionExpressionTokenizer(key_condition_expression) + tokenizer = GenericTokenizer(key_condition_expression) for crnt_char in tokenizer: if crnt_char == " ": if current_stage == EXPRESSION_STAGES.INITIAL_STAGE: diff --git a/moto/ses/models.py b/moto/ses/models.py index 7227abe12..27e1e5fef 100644 --- a/moto/ses/models.py +++ b/moto/ses/models.py @@ -1,4 +1,3 @@ -import re import json import email import datetime @@ -23,9 +22,9 @@ from .exceptions import ( RuleSetNameAlreadyExists, RuleSetDoesNotExist, RuleAlreadyExists, - MissingRenderingAttributeException, ConfigurationSetAlreadyExists, ) +from .template import parse_template from .utils import get_random_message_id, is_valid_address from .feedback import COMMON_MAIL, BOUNCE, COMPLAINT, DELIVERY @@ -106,17 +105,6 @@ class SESQuota(BaseModel): return self.sent -def are_all_variables_present(template, template_data): - subject_part = template["subject_part"] - text_part = template["text_part"] - html_part = template["html_part"] - - for var in re.findall("{{(.+?)}}", subject_part + text_part + html_part): - if not template_data.get(var): - return var, False - return None, True - - class SESBackend(BaseBackend): """ Responsible for mocking calls to SES. @@ -469,18 +457,13 @@ class SESBackend(BaseBackend): "Template rendering data is invalid" ) - var, are_variables_present = are_all_variables_present(template, template_data) - if not are_variables_present: - raise MissingRenderingAttributeException(var) - subject_part = template["subject_part"] text_part = template["text_part"] html_part = template["html_part"] - for key, value in template_data.items(): - subject_part = str.replace(str(subject_part), "{{" + key + "}}", value) - text_part = str.replace(str(text_part), "{{" + key + "}}", value) - html_part = str.replace(str(html_part), "{{" + key + "}}", value) + subject_part = parse_template(str(subject_part), template_data) + text_part = parse_template(str(text_part), template_data) + html_part = parse_template(str(html_part), template_data) email_obj = MIMEMultipart("alternative") diff --git a/moto/ses/template.py b/moto/ses/template.py new file mode 100644 index 000000000..ec34cd19c --- /dev/null +++ b/moto/ses/template.py @@ -0,0 +1,45 @@ +from .exceptions import MissingRenderingAttributeException +from moto.utilities.tokenizer import GenericTokenizer + + +def parse_template(template, template_data, tokenizer=None, until=None): + tokenizer = tokenizer or GenericTokenizer(template) + state = "" + parsed = "" + var_name = "" + for char in tokenizer: + if until is not None and (char + tokenizer.peek(len(until) - 1)) == until: + return parsed + if char == "{": + tokenizer.skip_characters("{") + tokenizer.skip_white_space() + if tokenizer.peek() == "#": + state = "LOOP" + tokenizer.skip_characters("#each") + tokenizer.skip_white_space() + else: + state = "VAR" + continue + if char == "}": + if state in ["LOOP", "VAR"]: + tokenizer.skip_characters("}") + if state == "VAR": + if template_data.get(var_name) is None: + raise MissingRenderingAttributeException(var_name) + parsed += template_data.get(var_name) + else: + current_position = tokenizer.token_pos + for item in template_data.get(var_name, []): + tokenizer.token_pos = current_position + parsed += parse_template( + template, item, tokenizer, until="{{/each}}" + ) + tokenizer.skip_characters("{/each}}") + var_name = "" + state = "" + continue + if state in ["LOOP", "VAR"]: + var_name += char + continue + parsed += char + return parsed diff --git a/moto/utilities/tokenizer.py b/moto/utilities/tokenizer.py new file mode 100644 index 000000000..a6e76dc7f --- /dev/null +++ b/moto/utilities/tokenizer.py @@ -0,0 +1,62 @@ +class GenericTokenizer: + """ + Tokenizer for a KeyConditionExpression. Should be used as an iterator. + The final character to be returned will be an empty string, to notify the caller that we've reached the end. + """ + + def __init__(self, expression): + self.expression = expression + self.token_pos = 0 + + def __iter__(self): + return self + + def is_eof(self): + return self.peek() == "" + + def peek(self, length=1): + """ + Peek the next character without changing the position + """ + try: + return self.expression[self.token_pos : self.token_pos + length] + except IndexError: + return "" + + def __next__(self): + """ + Returns the next character, or an empty string if we've reached the end of the string. + Calling this method again will result in a StopIterator + """ + try: + result = self.expression[self.token_pos] + self.token_pos += 1 + return result + except IndexError: + if self.token_pos == len(self.expression): + self.token_pos += 1 + return "" + raise StopIteration + + def skip_characters(self, phrase, case_sensitive=False) -> None: + """ + Skip the characters in the supplied phrase. + If any other character is encountered instead, this will fail. + If we've already reached the end of the iterator, this will fail. + """ + for ch in phrase: + if case_sensitive: + assert self.expression[self.token_pos] == ch + else: + assert self.expression[self.token_pos] in [ch.lower(), ch.upper()] + self.token_pos += 1 + + def skip_white_space(self): + """ + Skip any whitespace characters that are coming up + """ + try: + while self.peek() == " ": + self.token_pos += 1 + except IndexError: + pass diff --git a/tests/test_ses/test_ses_boto3.py b/tests/test_ses/test_ses_boto3.py index c2c27cdb9..0ca55c7f8 100644 --- a/tests/test_ses/test_ses_boto3.py +++ b/tests/test_ses/test_ses_boto3.py @@ -1268,6 +1268,35 @@ def test_render_template(): ) +@mock_ses +def test_render_template__with_foreach(): + conn = boto3.client("ses", region_name="us-east-1") + + kwargs = dict( + TemplateName="MTT", + TemplateData=json.dumps( + { + "items": [ + {"type": "dog", "name": "bobby"}, + {"type": "cat", "name": "pedro"}, + ] + } + ), + ) + + conn.create_template( + Template={ + "TemplateName": "MTT", + "SubjectPart": "..", + "TextPart": "..", + "HtmlPart": "{{#each items}} {{name}} is a {{type}}, {{/each}}", + } + ) + result = conn.test_render_template(**kwargs) + result["RenderedTemplate"].should.contain("bobby is a dog") + result["RenderedTemplate"].should.contain("pedro is a cat") + + @mock_ses def test_update_ses_template(): conn = boto3.client("ses", region_name="us-east-1") diff --git a/tests/test_ses/test_templating.py b/tests/test_ses/test_templating.py new file mode 100644 index 000000000..0883013b0 --- /dev/null +++ b/tests/test_ses/test_templating.py @@ -0,0 +1,26 @@ +from moto.ses.template import parse_template +import sure # noqa # pylint: disable=unused-import + + +def test_template_without_args(): + parse_template("some template", template_data={}).should.equal("some template") + + +def test_template_with_simple_arg(): + t = "Dear {{name}}," + parse_template(t, template_data={"name": "John"}).should.equal("Dear John,") + + +def test_template_with_foreach(): + t = "List:{{#each l}} - {{obj}}{{/each}} and other things" + kwargs = {"l": [{"obj": "it1"}, {"obj": "it2"}]} + parse_template(t, kwargs).should.equal("List: - it1 - it2 and other things") + + +def test_template_with_multiple_foreach(): + t = "{{#each l}} - {{obj}}{{/each}} and list 2 {{#each l2}} {{o1}} {{o2}} {{/each}}" + kwargs = { + "l": [{"obj": "it1"}, {"obj": "it2"}], + "l2": [{"o1": "ob1", "o2": "ob2"}, {"o1": "oc1", "o2": "oc2"}], + } + parse_template(t, kwargs).should.equal(" - it1 - it2 and list 2 ob1 ob2 oc1 oc2 ")