SES, SESv2: Fix get params in SESV2Response, and destinations in send_raw_email() (#6414)

This commit is contained in:
Yoshihiro Sugi 2023-06-18 21:07:13 +09:00 committed by GitHub
parent 4cbddbed96
commit 7b98252214
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 11 deletions

View File

@ -2,7 +2,7 @@ import json
import email
import datetime
from email.mime.base import MIMEBase
from email.utils import parseaddr
from email.utils import formataddr, getaddresses, parseaddr
from email.mime.multipart import MIMEMultipart
from email.encoders import encode_7or8bit
from typing import Any, Dict, List, Optional
@ -350,8 +350,12 @@ class SESBackend(BaseBackend):
f"Did not have authority to send from email {source}"
)
for header in "TO", "CC", "BCC":
destinations += [d.strip() for d in message.get(header, "").split(",") if d]
fieldvalues = [message.get(header, "") for header in ["TO", "CC", "BCC"]]
destinations += [
formataddr((realname, email_address))
for realname, email_address in getaddresses(fieldvalues)
if email_address
]
if len(destinations) > RECIPIENT_LIMIT:
raise MessageRejectedError("Too many recipients.")
for address in [addr for addr in [source, *destinations] if addr is not None]:

View File

@ -6,7 +6,7 @@ from moto.core.responses import BaseResponse
from .models import sesv2_backends
from ..ses.responses import SEND_EMAIL_RESPONSE
from .models import SESV2Backend
from typing import List, Dict, Any
from typing import List
from urllib.parse import unquote
@ -56,7 +56,7 @@ class SESV2Response(BaseResponse):
return template.render(message=message)
def create_contact_list(self) -> str:
params = get_params_dict(self.data)
params = json.loads(self.body)
self.sesv2_backend.create_contact_list(params)
return json.dumps({})
@ -76,7 +76,7 @@ class SESV2Response(BaseResponse):
def create_contact(self) -> str:
contact_list_name = self._get_param("ContactListName")
params = get_params_dict(self.data)
params = json.loads(self.body)
self.sesv2_backend.create_contact(contact_list_name, params)
return json.dumps({})
@ -96,8 +96,3 @@ class SESV2Response(BaseResponse):
contact_list_name = self._get_param("ContactListName")
self.sesv2_backend.delete_contact(unquote(email), contact_list_name)
return json.dumps({})
def get_params_dict(odict: Dict[str, Any]) -> Any:
# parsing of these params is nasty, hopefully there is a tidier way
return json.loads(list(dict(odict.items()).keys())[0])

View File

@ -102,6 +102,41 @@ def test_send_raw_email__with_specific_message(
assert msg.destinations == ["to@example.com", "foo@example.com"]
@mock_sesv2
def test_send_raw_email__with_to_address_display_name(
ses_v1,
): # pylint: disable=redefined-outer-name
# Setup
conn = boto3.client("sesv2", region_name="us-east-1")
message = get_raw_email()
# This particular message means that to-address with display-name which contains many ','
del message["To"]
display_name = ",".join(["c" for _ in range(50)])
message["To"] = f""""{display_name}" <to@example.com>, foo@example.com"""
kwargs = dict(
Content={"Raw": {"Data": message.as_bytes()}},
)
# Execute
ses_v1.verify_email_identity(EmailAddress="test@example.com")
conn.send_email(**kwargs)
# Verify
send_quota = ses_v1.get_send_quota()
sent_count = int(send_quota["SentLast24Hours"])
assert sent_count == 2
if not settings.TEST_SERVER_MODE:
backend = ses_backends[DEFAULT_ACCOUNT_ID]["us-east-1"]
msg: RawMessage = backend.sent_messages[0]
assert message.as_bytes() == msg.raw_data.encode("utf-8")
assert msg.source == "test@example.com"
assert msg.destinations == [
""""c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c,c" <to@example.com>""",
"foo@example.com",
]
@mock_sesv2
def test_create_contact_list():
# Setup
@ -119,6 +154,30 @@ def test_create_contact_list():
assert result["ContactLists"][0]["ContactListName"] == contact_list_name
@mock_sesv2
def test_create_contact_list__with_topics():
# Setup
conn = boto3.client("sesv2", region_name="us-east-1")
contact_list_name = "test3"
# Execute
conn.create_contact_list(
ContactListName=contact_list_name,
Topics=[
{
"TopicName": "test-topic",
"DisplayName": "display=name",
"DefaultSubscriptionStatus": "OPT_OUT",
}
],
)
result = conn.list_contact_lists()
# Verify
assert len(result["ContactLists"]) == 1
assert result["ContactLists"][0]["ContactListName"] == contact_list_name
@mock_sesv2
def test_list_contact_lists():
# Setup