SES: Implement GetIdentityMailFromDomainAttributes and SetIdentityMailFromDomain (#4842)

This commit is contained in:
Viren Nadkarni 2022-02-09 23:49:54 +05:30 committed by GitHub
parent 3d0bbd23ac
commit 5580b519e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 201 additions and 3 deletions

View File

@ -4822,7 +4822,7 @@
- [ ] get_account_sending_enabled
- [ ] get_custom_verification_email_template
- [ ] get_identity_dkim_attributes
- [ ] get_identity_mail_from_domain_attributes
- [X] get_identity_mail_from_domain_attributes
- [X] get_identity_notification_attributes
- [ ] get_identity_policies
- [ ] get_identity_verification_attributes
@ -4850,7 +4850,7 @@
- [ ] set_identity_dkim_enabled
- [X] set_identity_feedback_forwarding_enabled
- [ ] set_identity_headers_in_notifications_enabled
- [ ] set_identity_mail_from_domain
- [X] set_identity_mail_from_domain
- [X] set_identity_notification_topic
- [ ] set_receipt_rule_position
- [ ] test_render_template

View File

@ -117,6 +117,7 @@ class SESBackend(BaseBackend):
self.config_set = {}
self.config_set_event_destination = {}
self.event_destinations = {}
self.identity_mail_from_domains = {}
self.templates = {}
self.receipt_rule_set = {}
@ -477,5 +478,49 @@ class SESBackend(BaseBackend):
else:
raise RuleDoesNotExist(f"Rule does not exist: {rule['name']}")
def set_identity_mail_from_domain(
self, identity, mail_from_domain=None, behavior_on_mx_failure=None
):
if identity not in (self.domains + self.addresses):
raise InvalidParameterValue(
"Identity '{0}' does not exist.".format(identity)
)
if mail_from_domain is None:
self.identity_mail_from_domains.pop(identity)
return
if not mail_from_domain.endswith(identity):
raise InvalidParameterValue(
"Provided MAIL-FROM domain '{0}' is not subdomain of "
"the domain of the identity '{1}'.".format(mail_from_domain, identity)
)
if behavior_on_mx_failure not in (None, "RejectMessage", "UseDefaultValue"):
raise ValidationError(
"1 validation error detected: "
"Value '{0}' at 'behaviorOnMXFailure'"
"failed to satisfy constraint: Member must satisfy enum value set: "
"[RejectMessage, UseDefaultValue]".format(behavior_on_mx_failure)
)
self.identity_mail_from_domains[identity] = {
"mail_from_domain": mail_from_domain,
"behavior_on_mx_failure": behavior_on_mx_failure,
}
def get_identity_mail_from_domain_attributes(self, identities=None):
if identities is None:
identities = []
attributes_by_identity = {}
for identity in identities:
if identity in (self.domains + self.addresses):
attributes_by_identity[identity] = self.identity_mail_from_domains.get(
identity
) or {"behavior_on_mx_failure": "UseDefaultValue"}
return attributes_by_identity
ses_backend = SESBackend()

View File

@ -279,6 +279,25 @@ class EmailResponse(BaseResponse):
template = self.response_template(UPDATE_RECEIPT_RULE)
return template.render()
def set_identity_mail_from_domain(self):
identity = self._get_param("Identity")
mail_from_domain = self._get_param("MailFromDomain")
behavior_on_mx_failure = self._get_param("BehaviorOnMXFailure")
ses_backend.set_identity_mail_from_domain(
identity, mail_from_domain, behavior_on_mx_failure
)
template = self.response_template(SET_IDENTITY_MAIL_FROM_DOMAIN)
return template.render()
def get_identity_mail_from_domain_attributes(self):
identities = self._get_multi_param("Identities.member.")
identities = ses_backend.get_identity_mail_from_domain_attributes(identities)
template = self.response_template(GET_IDENTITY_MAIL_FROM_DOMAIN_ATTRIBUTES)
return template.render(identities=identities)
VERIFY_EMAIL_IDENTITY = """<VerifyEmailIdentityResponse xmlns="http://ses.amazonaws.com/doc/2010-12-01/">
<VerifyEmailIdentityResult/>
@ -633,3 +652,36 @@ UPDATE_RECEIPT_RULE = """<UpdateReceiptRuleResponse xmlns="http://ses.amazonaws.
<RequestId>15e0ef1a-9bf2-11e1-9279-01ab88cf109a</RequestId>
</ResponseMetadata>
</UpdateReceiptRuleResponse>"""
SET_IDENTITY_MAIL_FROM_DOMAIN = """<SetIdentityMailFromDomainResponse xmlns="http://ses.amazonaws.com/doc/2010-12-01/">
<SetIdentityMailFromDomainResult/>
<ResponseMetadata>
<RequestId>47e0ef1a-9bf2-11e1-9279-0100e8cf109a</RequestId>
</ResponseMetadata>
</SetIdentityMailFromDomainResponse>"""
GET_IDENTITY_MAIL_FROM_DOMAIN_ATTRIBUTES = """<GetIdentityMailFromDomainAttributesResponse xmlns="http://ses.amazonaws.com/doc/2010-12-01/">
<GetIdentityMailFromDomainAttributesResult>
{% if identities.items()|length > 0 %}
<MailFromDomainAttributes>
{% for name, value in identities.items() %}
<entry>
<key>{{ name }}</key>
<value>
{% if 'mail_from_domain' in value %}
<MailFromDomain>{{ value.get("mail_from_domain") }}</MailFromDomain>
<MailFromDomainStatus>Success</MailFromDomainStatus>
{% endif %}
<BehaviorOnMXFailure>{{ value.get("behavior_on_mx_failure") }}</BehaviorOnMXFailure>
</value>
</entry>
{% endfor %}
</MailFromDomainAttributes>
{% else %}
<MailFromDomainAttributes/>
{% endif %}
</GetIdentityMailFromDomainAttributesResult>
<ResponseMetadata>
<RequestId>47e0ef1a-9bf2-11e1-9279-0100e8cf109a</RequestId>
</ResponseMetadata>
</GetIdentityMailFromDomainAttributesResponse>"""

View File

@ -1230,3 +1230,104 @@ def test_get_send_statistics():
stats[0]["Rejects"].should.equal(1)
stats[0]["DeliveryAttempts"].should.equal(1)
@mock_ses
def test_set_identity_mail_from_domain():
conn = boto3.client("ses", region_name="eu-central-1")
# Must raise if provided identity does not exist
with pytest.raises(ClientError) as exc:
conn.set_identity_mail_from_domain(Identity="foo.com")
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue")
err["Message"].should.equal("Identity 'foo.com' does not exist.")
conn.verify_domain_identity(Domain="foo.com")
# Must raise if MAILFROM is not a subdomain of identity
with pytest.raises(ClientError) as exc:
conn.set_identity_mail_from_domain(
Identity="foo.com", MailFromDomain="lorem.ipsum.com"
)
err = exc.value.response["Error"]
err["Code"].should.equal("InvalidParameterValue")
err["Message"].should.equal(
"Provided MAIL-FROM domain 'lorem.ipsum.com' is not subdomain of "
"the domain of the identity 'foo.com'."
)
# Must raise if BehaviorOnMXFailure is not a valid choice
with pytest.raises(ClientError) as exc:
conn.set_identity_mail_from_domain(
Identity="foo.com",
MailFromDomain="lorem.foo.com",
BehaviorOnMXFailure="SelfDestruct",
)
err = exc.value.response["Error"]
err["Code"].should.equal("ValidationError")
err["Message"].should.equal(
"1 validation error detected: Value 'SelfDestruct' at "
"'behaviorOnMXFailure'failed to satisfy constraint: Member must "
"satisfy enum value set: [RejectMessage, UseDefaultValue]"
)
# Must set config for valid input
behaviour_on_mx_failure = "RejectMessage"
mail_from_domain = "lorem.foo.com"
conn.set_identity_mail_from_domain(
Identity="foo.com",
MailFromDomain=mail_from_domain,
BehaviorOnMXFailure=behaviour_on_mx_failure,
)
attributes = conn.get_identity_mail_from_domain_attributes(Identities=["foo.com"])
actual_attributes = attributes["MailFromDomainAttributes"]["foo.com"]
actual_attributes.should.have.key("MailFromDomain").being.equal(mail_from_domain)
actual_attributes.should.have.key("BehaviorOnMXFailure").being.equal(
behaviour_on_mx_failure
)
actual_attributes.should.have.key("MailFromDomainStatus").being.equal("Success")
# Must unset config when MailFromDomain is null
conn.set_identity_mail_from_domain(Identity="foo.com")
attributes = conn.get_identity_mail_from_domain_attributes(Identities=["foo.com"])
actual_attributes = attributes["MailFromDomainAttributes"]["foo.com"]
actual_attributes.should.have.key("BehaviorOnMXFailure").being.equal(
"UseDefaultValue"
)
actual_attributes.should_not.have.key("MailFromDomain")
actual_attributes.should_not.have.key("MailFromDomainStatus")
@mock_ses
def test_get_identity_mail_from_domain_attributes():
conn = boto3.client("ses", region_name="eu-central-1")
# Must return empty for non-existent identities
attributes = conn.get_identity_mail_from_domain_attributes(
Identities=["bar@foo.com", "lorem.com"]
)
attributes["MailFromDomainAttributes"].should.have.length_of(0)
# Must return default options for non-configured identities
conn.verify_email_identity(EmailAddress="bar@foo.com")
attributes = conn.get_identity_mail_from_domain_attributes(
Identities=["bar@foo.com", "lorem.com"]
)
attributes["MailFromDomainAttributes"].should.have.length_of(1)
attributes["MailFromDomainAttributes"]["bar@foo.com"].should.have.length_of(1)
attributes["MailFromDomainAttributes"]["bar@foo.com"].should.have.key(
"BehaviorOnMXFailure"
).being.equal("UseDefaultValue")
# Must return multiple configured identities
conn.verify_domain_identity(Domain="lorem.com")
attributes = conn.get_identity_mail_from_domain_attributes(
Identities=["bar@foo.com", "lorem.com"]
)
attributes["MailFromDomainAttributes"].should.have.length_of(2)
attributes["MailFromDomainAttributes"]["bar@foo.com"].should.have.length_of(1)
attributes["MailFromDomainAttributes"]["lorem.com"].should.have.length_of(1)