from __future__ import unicode_literals
import base64
from moto.core.responses import BaseResponse
from .models import ses_backend
from datetime import datetime
class EmailResponse(BaseResponse):
def verify_email_identity(self):
address = self.querystring.get("EmailAddress")[0]
ses_backend.verify_email_identity(address)
template = self.response_template(VERIFY_EMAIL_IDENTITY)
return template.render()
def verify_email_address(self):
address = self.querystring.get("EmailAddress")[0]
ses_backend.verify_email_address(address)
template = self.response_template(VERIFY_EMAIL_ADDRESS)
return template.render()
def list_identities(self):
identities = ses_backend.list_identities()
template = self.response_template(LIST_IDENTITIES_RESPONSE)
return template.render(identities=identities)
def list_verified_email_addresses(self):
email_addresses = ses_backend.list_verified_email_addresses()
template = self.response_template(LIST_VERIFIED_EMAIL_RESPONSE)
return template.render(email_addresses=email_addresses)
def verify_domain_dkim(self):
domain = self.querystring.get("Domain")[0]
ses_backend.verify_domain(domain)
template = self.response_template(VERIFY_DOMAIN_DKIM_RESPONSE)
return template.render()
def verify_domain_identity(self):
domain = self.querystring.get("Domain")[0]
ses_backend.verify_domain(domain)
template = self.response_template(VERIFY_DOMAIN_IDENTITY_RESPONSE)
return template.render()
def delete_identity(self):
domain = self.querystring.get("Identity")[0]
ses_backend.delete_identity(domain)
template = self.response_template(DELETE_IDENTITY_RESPONSE)
return template.render()
def send_email(self):
bodydatakey = "Message.Body.Text.Data"
if "Message.Body.Html.Data" in self.querystring:
bodydatakey = "Message.Body.Html.Data"
body = self.querystring.get(bodydatakey)[0]
source = self.querystring.get("Source")[0]
subject = self.querystring.get("Message.Subject.Data")[0]
destinations = {"ToAddresses": [], "CcAddresses": [], "BccAddresses": []}
for dest_type in destinations:
# consume up to 51 to allow exception
for i in range(1, 52):
field = "Destination.%s.member.%s" % (dest_type, i)
address = self.querystring.get(field)
if address is None:
break
destinations[dest_type].append(address[0])
message = ses_backend.send_email(
source, subject, body, destinations, self.region
)
template = self.response_template(SEND_EMAIL_RESPONSE)
return template.render(message=message)
def send_templated_email(self):
source = self.querystring.get("Source")[0]
template = self.querystring.get("Template")
template_data = self.querystring.get("TemplateData")
destinations = {"ToAddresses": [], "CcAddresses": [], "BccAddresses": []}
for dest_type in destinations:
# consume up to 51 to allow exception
for i in range(1, 52):
field = "Destination.%s.member.%s" % (dest_type, i)
address = self.querystring.get(field)
if address is None:
break
destinations[dest_type].append(address[0])
message = ses_backend.send_templated_email(
source, template, template_data, destinations, self.region
)
template = self.response_template(SEND_TEMPLATED_EMAIL_RESPONSE)
return template.render(message=message)
def send_raw_email(self):
source = self.querystring.get("Source")
if source is not None:
(source,) = source
raw_data = self.querystring.get("RawMessage.Data")[0]
raw_data = base64.b64decode(raw_data)
raw_data = raw_data.decode("utf-8")
destinations = []
# consume up to 51 to allow exception
for i in range(1, 52):
field = "Destinations.member.%s" % i
address = self.querystring.get(field)
if address is None:
break
destinations.append(address[0])
message = ses_backend.send_raw_email(
source, destinations, raw_data, self.region
)
template = self.response_template(SEND_RAW_EMAIL_RESPONSE)
return template.render(message=message)
def get_send_quota(self):
quota = ses_backend.get_send_quota()
template = self.response_template(GET_SEND_QUOTA_RESPONSE)
return template.render(quota=quota)
def set_identity_notification_topic(self):
identity = self.querystring.get("Identity")[0]
not_type = self.querystring.get("NotificationType")[0]
sns_topic = self.querystring.get("SnsTopic")
if sns_topic:
sns_topic = sns_topic[0]
ses_backend.set_identity_notification_topic(identity, not_type, sns_topic)
template = self.response_template(SET_IDENTITY_NOTIFICATION_TOPIC_RESPONSE)
return template.render()
def get_send_statistics(self):
statistics = ses_backend.get_send_statistics()
template = self.response_template(GET_SEND_STATISTICS)
return template.render(all_statistics=[statistics])
def create_configuration_set(self):
configuration_set_name = self.querystring.get("ConfigurationSet.Name")[0]
ses_backend.create_configuration_set(
configuration_set_name=configuration_set_name
)
template = self.response_template(CREATE_CONFIGURATION_SET)
return template.render()
def create_configuration_set_event_destination(self):
configuration_set_name = self._get_param("ConfigurationSetName")
is_configuration_event_enabled = self.querystring.get(
"EventDestination.Enabled"
)[0]
configuration_event_name = self.querystring.get("EventDestination.Name")[0]
event_topic_arn = self.querystring.get(
"EventDestination.SNSDestination.TopicARN"
)[0]
event_matching_types = self._get_multi_param(
"EventDestination.MatchingEventTypes.member"
)
event_destination = {
"Name": configuration_event_name,
"Enabled": is_configuration_event_enabled,
"EventMatchingTypes": event_matching_types,
"SNSDestination": event_topic_arn,
}
ses_backend.create_configuration_set_event_destination(
configuration_set_name=configuration_set_name,
event_destination=event_destination,
)
template = self.response_template(CREATE_CONFIGURATION_SET_EVENT_DESTINATION)
return template.render()
def create_template(self):
template_data = self._get_dict_param("Template")
template_info = {}
template_info["text_part"] = template_data.get("._text_part", "")
template_info["html_part"] = template_data.get("._html_part", "")
template_info["template_name"] = template_data.get("._name", "")
template_info["subject_part"] = template_data.get("._subject_part", "")
template_info["Timestamp"] = datetime.utcnow()
ses_backend.add_template(template_info=template_info)
template = self.response_template(CREATE_TEMPLATE)
return template.render()
def update_template(self):
template_data = self._get_dict_param("Template")
template_info = {}
template_info["text_part"] = template_data.get("._text_part", "")
template_info["html_part"] = template_data.get("._html_part", "")
template_info["template_name"] = template_data.get("._name", "")
template_info["subject_part"] = template_data.get("._subject_part", "")
template_info["Timestamp"] = datetime.utcnow()
ses_backend.update_template(template_info=template_info)
template = self.response_template(UPDATE_TEMPLATE)
return template.render()
def get_template(self):
template_name = self._get_param("TemplateName")
template_data = ses_backend.get_template(template_name)
template = self.response_template(GET_TEMPLATE)
return template.render(template_data=template_data)
def list_templates(self):
email_templates = ses_backend.list_templates()
template = self.response_template(LIST_TEMPLATES)
return template.render(templates=email_templates)
def test_render_template(self):
render_info = self._get_dict_param("Template")
rendered_template = ses_backend.render_template(render_info)
template = self.response_template(RENDER_TEMPLATE)
return template.render(template=rendered_template)
def create_receipt_rule_set(self):
rule_set_name = self._get_param("RuleSetName")
ses_backend.create_receipt_rule_set(rule_set_name)
template = self.response_template(CREATE_RECEIPT_RULE_SET)
return template.render()
def create_receipt_rule(self):
rule_set_name = self._get_param("RuleSetName")
rule = self._get_dict_param("Rule")
ses_backend.create_receipt_rule(rule_set_name, rule)
template = self.response_template(CREATE_RECEIPT_RULE)
return template.render()
VERIFY_EMAIL_IDENTITY = """
47e0ef1a-9bf2-11e1-9279-0100e8cf109a
"""
VERIFY_EMAIL_ADDRESS = """
47e0ef1a-9bf2-11e1-9279-0100e8cf109a
"""
LIST_IDENTITIES_RESPONSE = """
{% for identity in identities %}
{{ identity }}
{% endfor %}
cacecf23-9bf1-11e1-9279-0100e8cf109a
"""
LIST_VERIFIED_EMAIL_RESPONSE = """
{% for email in email_addresses %}
{{ email }}
{% endfor %}
cacecf23-9bf1-11e1-9279-0100e8cf109a
"""
VERIFY_DOMAIN_DKIM_RESPONSE = """
vvjuipp74whm76gqoni7qmwwn4w4qusjiainivf6sf
3frqe7jn4obpuxjpwpolz6ipb3k5nvt2nhjpik2oy
wrqplteh7oodxnad7hsl4mixg2uavzneazxv5sxi2
9662c15b-c469-11e1-99d1-797d6ecd6414
"""
VERIFY_DOMAIN_IDENTITY_RESPONSE = """\
QTKknzFg2J4ygwa+XvHAxUl1hyHoY0gVfZdfjIedHZ0=
94f6368e-9bf2-11e1-8ee7-c98a0037a2b6
"""
DELETE_IDENTITY_RESPONSE = """
d96bd874-9bf2-11e1-8ee7-c98a0037a2b6
"""
SEND_EMAIL_RESPONSE = """
{{ message.id }}
d5964849-c866-11e0-9beb-01a62d68c57f
"""
SEND_TEMPLATED_EMAIL_RESPONSE = """
{{ message.id }}
d5964849-c866-11e0-9beb-01a62d68c57f
"""
SEND_RAW_EMAIL_RESPONSE = """
{{ message.id }}
e0abcdfa-c866-11e0-b6d0-273d09173b49
"""
GET_SEND_QUOTA_RESPONSE = """
{{ quota.sent_past_24 }}
200.0
1.0
273021c6-c866-11e0-b926-699e21c3af9e
"""
SET_IDENTITY_NOTIFICATION_TOPIC_RESPONSE = """
47e0ef1a-9bf2-11e1-9279-0100e8cf109a
"""
GET_SEND_STATISTICS = """
{% for statistics in all_statistics %}
-
{{ statistics["DeliveryAttempts"] }}
{{ statistics["Rejects"] }}
{{ statistics["Bounces"] }}
{{ statistics["Complaints"] }}
{{ statistics["Timestamp"] }}
{% endfor %}
e0abcdfa-c866-11e0-b6d0-273d09173z49
"""
CREATE_CONFIGURATION_SET = """
47e0ef1a-9bf2-11e1-9279-0100e8cf109a
"""
CREATE_CONFIGURATION_SET_EVENT_DESTINATION = """
67e0ef1a-9bf2-11e1-9279-0100e8cf109a
"""
CREATE_TEMPLATE = """
47e0ef1a-9bf2-11e1-9279-0100e8cf12ba
"""
UPDATE_TEMPLATE = """
47e0ef1a-9bf2-11e1-9279-0100e8cf12ba
"""
GET_TEMPLATE = """
{{ template_data["template_name"] }}
{{ template_data["subject_part"] }}
{{ template_data["text_part"] }}
47e0ef1a-9bf2-11e1-9279-0100e8cf12ba
"""
LIST_TEMPLATES = """
{% for template in templates %}
-
{{ template["template_name"] }}
{{ template["Timestamp"] }}
{% endfor %}
47e0ef1a-9bf2-11e1-9279-0100e8cf12ba
"""
RENDER_TEMPLATE = """
{{template | e}}
47e0ef1a-9bf2-11e1-9279-0100e8cf12ba
"""
CREATE_RECEIPT_RULE_SET = """
47e0ef1a-9bf2-11e1-9279-01ab88cf109a
"""
CREATE_RECEIPT_RULE = """
15e0ef1a-9bf2-11e1-9279-01ab88cf109a
"""