Implemented S3 get/put_notification_configuration (#1516)

closes #973
This commit is contained in:
Mike Grima 2018-03-21 09:11:24 -07:00 committed by Jack Danger
parent 4b3469292a
commit cc14114afe
4 changed files with 486 additions and 29 deletions

View File

@ -138,3 +138,33 @@ class CrossLocationLoggingProhibitted(S3ClientError):
"CrossLocationLoggingProhibitted",
"Cross S3 location logging not allowed."
)
class InvalidNotificationARN(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(InvalidNotificationARN, self).__init__(
"InvalidArgument",
"The ARN is not well formed",
*args, **kwargs)
class InvalidNotificationDestination(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(InvalidNotificationDestination, self).__init__(
"InvalidArgument",
"The notification destination service region is not valid for the bucket location constraint",
*args, **kwargs)
class InvalidNotificationEvent(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(InvalidNotificationEvent, self).__init__(
"InvalidArgument",
"The event is not supported for notifications",
*args, **kwargs)

View File

@ -6,12 +6,16 @@ import hashlib
import copy
import itertools
import codecs
import random
import string
import six
from bisect import insort
from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds, rfc_1123_datetime
from .exceptions import BucketAlreadyExists, MissingBucket, InvalidPart, EntityTooSmall, MissingKey
from .exceptions import BucketAlreadyExists, MissingBucket, InvalidPart, EntityTooSmall, MissingKey, \
InvalidNotificationDestination
from .utils import clean_key_name, _VersionedKeyStore
UPLOAD_ID_BYTES = 43
@ -270,7 +274,7 @@ def get_canned_acl(acl):
grants.append(FakeGrant([ALL_USERS_GRANTEE], [PERMISSION_READ]))
elif acl == 'public-read-write':
grants.append(FakeGrant([ALL_USERS_GRANTEE], [
PERMISSION_READ, PERMISSION_WRITE]))
PERMISSION_READ, PERMISSION_WRITE]))
elif acl == 'authenticated-read':
grants.append(
FakeGrant([AUTHENTICATED_USERS_GRANTEE], [PERMISSION_READ]))
@ -282,7 +286,7 @@ def get_canned_acl(acl):
pass # TODO: bucket owner, EC2 Read
elif acl == 'log-delivery-write':
grants.append(FakeGrant([LOG_DELIVERY_GRANTEE], [
PERMISSION_READ_ACP, PERMISSION_WRITE]))
PERMISSION_READ_ACP, PERMISSION_WRITE]))
else:
assert False, 'Unknown canned acl: %s' % (acl,)
return FakeAcl(grants=grants)
@ -333,6 +337,26 @@ class CorsRule(BaseModel):
self.max_age_seconds = max_age_seconds
class Notification(BaseModel):
def __init__(self, arn, events, filters=None, id=None):
self.id = id if id else ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(50))
self.arn = arn
self.events = events
self.filters = filters if filters else {}
class NotificationConfiguration(BaseModel):
def __init__(self, topic=None, queue=None, cloud_function=None):
self.topic = [Notification(t["Topic"], t["Event"], filters=t.get("Filter"), id=t.get("Id")) for t in topic] \
if topic else []
self.queue = [Notification(q["Queue"], q["Event"], filters=q.get("Filter"), id=q.get("Id")) for q in queue] \
if queue else []
self.cloud_function = [Notification(c["CloudFunction"], c["Event"], filters=c.get("Filter"), id=c.get("Id"))
for c in cloud_function] if cloud_function else []
class FakeBucket(BaseModel):
def __init__(self, name, region_name):
@ -348,6 +372,7 @@ class FakeBucket(BaseModel):
self.tags = FakeTagging()
self.cors = []
self.logging = {}
self.notification_configuration = None
@property
def location(self):
@ -426,36 +451,55 @@ class FakeBucket(BaseModel):
def set_logging(self, logging_config, bucket_backend):
if not logging_config:
self.logging = {}
else:
from moto.s3.exceptions import InvalidTargetBucketForLogging, CrossLocationLoggingProhibitted
# Target bucket must exist in the same account (assuming all moto buckets are in the same account):
if not bucket_backend.buckets.get(logging_config["TargetBucket"]):
raise InvalidTargetBucketForLogging("The target bucket for logging does not exist.")
return
# Does the target bucket have the log-delivery WRITE and READ_ACP permissions?
write = read_acp = False
for grant in bucket_backend.buckets[logging_config["TargetBucket"]].acl.grants:
# Must be granted to: http://acs.amazonaws.com/groups/s3/LogDelivery
for grantee in grant.grantees:
if grantee.uri == "http://acs.amazonaws.com/groups/s3/LogDelivery":
if "WRITE" in grant.permissions or "FULL_CONTROL" in grant.permissions:
write = True
from moto.s3.exceptions import InvalidTargetBucketForLogging, CrossLocationLoggingProhibitted
# Target bucket must exist in the same account (assuming all moto buckets are in the same account):
if not bucket_backend.buckets.get(logging_config["TargetBucket"]):
raise InvalidTargetBucketForLogging("The target bucket for logging does not exist.")
if "READ_ACP" in grant.permissions or "FULL_CONTROL" in grant.permissions:
read_acp = True
# Does the target bucket have the log-delivery WRITE and READ_ACP permissions?
write = read_acp = False
for grant in bucket_backend.buckets[logging_config["TargetBucket"]].acl.grants:
# Must be granted to: http://acs.amazonaws.com/groups/s3/LogDelivery
for grantee in grant.grantees:
if grantee.uri == "http://acs.amazonaws.com/groups/s3/LogDelivery":
if "WRITE" in grant.permissions or "FULL_CONTROL" in grant.permissions:
write = True
break
if "READ_ACP" in grant.permissions or "FULL_CONTROL" in grant.permissions:
read_acp = True
if not write or not read_acp:
raise InvalidTargetBucketForLogging("You must give the log-delivery group WRITE and READ_ACP"
" permissions to the target bucket")
break
# Buckets must also exist within the same region:
if bucket_backend.buckets[logging_config["TargetBucket"]].region_name != self.region_name:
raise CrossLocationLoggingProhibitted()
if not write or not read_acp:
raise InvalidTargetBucketForLogging("You must give the log-delivery group WRITE and READ_ACP"
" permissions to the target bucket")
# Checks pass -- set the logging config:
self.logging = logging_config
# Buckets must also exist within the same region:
if bucket_backend.buckets[logging_config["TargetBucket"]].region_name != self.region_name:
raise CrossLocationLoggingProhibitted()
# Checks pass -- set the logging config:
self.logging = logging_config
def set_notification_configuration(self, notification_config):
if not notification_config:
self.notification_configuration = None
return
self.notification_configuration = NotificationConfiguration(
topic=notification_config.get("TopicConfiguration"),
queue=notification_config.get("QueueConfiguration"),
cloud_function=notification_config.get("CloudFunctionConfiguration")
)
# Validate that the region is correct:
for thing in ["topic", "queue", "cloud_function"]:
for t in getattr(self.notification_configuration, thing):
region = t.arn.split(":")[3]
if region != self.region_name:
raise InvalidNotificationDestination()
def set_website_configuration(self, website_configuration):
self.website_configuration = website_configuration
@ -651,6 +695,10 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name)
bucket.delete_cors()
def put_bucket_notification_configuration(self, bucket_name, notification_config):
bucket = self.get_bucket(bucket_name)
bucket.set_notification_configuration(notification_config)
def initiate_multipart(self, bucket_name, key_name, metadata):
bucket = self.get_bucket(bucket_name)
new_multipart = FakeMultipart(key_name, metadata)

View File

@ -15,7 +15,7 @@ from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_n
parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys
from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, MissingKey, InvalidPartOrder, MalformedXML, \
MalformedACLError
MalformedACLError, InvalidNotificationARN, InvalidNotificationEvent
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \
FakeTag
from .utils import bucket_name_from_url, metadata_from_headers, parse_region_from_url
@ -243,6 +243,13 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_CORS_RESPONSE)
return template.render(bucket=bucket)
elif "notification" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.notification_configuration:
return 200, {}, ""
template = self.response_template(S3_GET_BUCKET_NOTIFICATION_CONFIG)
return template.render(bucket=bucket)
elif 'versions' in querystring:
delimiter = querystring.get('delimiter', [None])[0]
encoding_type = querystring.get('encoding-type', [None])[0]
@ -411,6 +418,15 @@ class ResponseObject(_TemplateEnvironmentMixin):
return ""
except KeyError:
raise MalformedXML()
elif "notification" in querystring:
try:
self.backend.put_bucket_notification_configuration(bucket_name,
self._notification_config_from_xml(body))
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
else:
if body:
@ -918,6 +934,74 @@ class ResponseObject(_TemplateEnvironmentMixin):
return parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]
def _notification_config_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
if not len(parsed_xml["NotificationConfiguration"]):
return {}
# The types of notifications, and their required fields (apparently lambda is categorized by the API as
# "CloudFunction"):
notification_fields = [
("Topic", "sns"),
("Queue", "sqs"),
("CloudFunction", "lambda")
]
event_names = [
's3:ReducedRedundancyLostObject',
's3:ObjectCreated:*',
's3:ObjectCreated:Put',
's3:ObjectCreated:Post',
's3:ObjectCreated:Copy',
's3:ObjectCreated:CompleteMultipartUpload',
's3:ObjectRemoved:*',
's3:ObjectRemoved:Delete',
's3:ObjectRemoved:DeleteMarkerCreated'
]
found_notifications = 0 # Tripwire -- if this is not ever set, then there were no notifications
for name, arn_string in notification_fields:
# 1st verify that the proper notification configuration has been passed in (with an ARN that is close
# to being correct -- nothing too complex in the ARN logic):
the_notification = parsed_xml["NotificationConfiguration"].get("{}Configuration".format(name))
if the_notification:
found_notifications += 1
if not isinstance(the_notification, list):
the_notification = parsed_xml["NotificationConfiguration"]["{}Configuration".format(name)] \
= [the_notification]
for n in the_notification:
if not n[name].startswith("arn:aws:{}:".format(arn_string)):
raise InvalidNotificationARN()
# 2nd, verify that the Events list is correct:
assert n["Event"]
if not isinstance(n["Event"], list):
n["Event"] = [n["Event"]]
for event in n["Event"]:
if event not in event_names:
raise InvalidNotificationEvent()
# Parse out the filters:
if n.get("Filter"):
# Error if S3Key is blank:
if not n["Filter"]["S3Key"]:
raise KeyError()
if not isinstance(n["Filter"]["S3Key"]["FilterRule"], list):
n["Filter"]["S3Key"]["FilterRule"] = [n["Filter"]["S3Key"]["FilterRule"]]
for filter_rule in n["Filter"]["S3Key"]["FilterRule"]:
assert filter_rule["Name"] in ["suffix", "prefix"]
assert filter_rule["Value"]
if not found_notifications:
return {}
return parsed_xml["NotificationConfiguration"]
def _key_response_delete(self, bucket_name, query, key_name, headers):
if query.get('uploadId'):
upload_id = query['uploadId'][0]
@ -1460,3 +1544,71 @@ S3_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
S3_NO_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<BucketLoggingStatus xmlns="http://doc.s3.amazonaws.com/2006-03-01" />
"""
S3_GET_BUCKET_NOTIFICATION_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
{% for topic in bucket.notification_configuration.topic %}
<TopicConfiguration>
<Id>{{ topic.id }}</Id>
<Topic>{{ topic.arn }}</Topic>
{% for event in topic.events %}
<Event>{{ event }}</Event>
{% endfor %}
{% if topic.filters %}
<Filter>
<S3Key>
{% for rule in topic.filters["S3Key"]["FilterRule"] %}
<FilterRule>
<Name>{{ rule["Name"] }}</Name>
<Value>{{ rule["Value"] }}</Value>
</FilterRule>
{% endfor %}
</S3Key>
</Filter>
{% endif %}
</TopicConfiguration>
{% endfor %}
{% for queue in bucket.notification_configuration.queue %}
<QueueConfiguration>
<Id>{{ queue.id }}</Id>
<Queue>{{ queue.arn }}</Queue>
{% for event in queue.events %}
<Event>{{ event }}</Event>
{% endfor %}
{% if queue.filters %}
<Filter>
<S3Key>
{% for rule in queue.filters["S3Key"]["FilterRule"] %}
<FilterRule>
<Name>{{ rule["Name"] }}</Name>
<Value>{{ rule["Value"] }}</Value>
</FilterRule>
{% endfor %}
</S3Key>
</Filter>
{% endif %}
</QueueConfiguration>
{% endfor %}
{% for cf in bucket.notification_configuration.cloud_function %}
<CloudFunctionConfiguration>
<Id>{{ cf.id }}</Id>
<CloudFunction>{{ cf.arn }}</CloudFunction>
{% for event in cf.events %}
<Event>{{ event }}</Event>
{% endfor %}
{% if cf.filters %}
<Filter>
<S3Key>
{% for rule in cf.filters["S3Key"]["FilterRule"] %}
<FilterRule>
<Name>{{ rule["Name"] }}</Name>
<Value>{{ rule["Value"] }}</Value>
</FilterRule>
{% endfor %}
</S3Key>
</Filter>
{% endif %}
</CloudFunctionConfiguration>
{% endfor %}
</NotificationConfiguration>
"""

View File

@ -1835,6 +1835,233 @@ def test_put_bucket_acl_body():
assert not result.get("Grants")
@mock_s3
def test_put_bucket_notification():
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="bucket")
# With no configuration:
result = s3.get_bucket_notification(Bucket="bucket")
assert not result.get("TopicConfigurations")
assert not result.get("QueueConfigurations")
assert not result.get("LambdaFunctionConfigurations")
# Place proper topic configuration:
s3.put_bucket_notification_configuration(Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"TopicArn": "arn:aws:sns:us-east-1:012345678910:mytopic",
"Events": [
"s3:ObjectCreated:*",
"s3:ObjectRemoved:*"
]
},
{
"TopicArn": "arn:aws:sns:us-east-1:012345678910:myothertopic",
"Events": [
"s3:ObjectCreated:*"
],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "images/"
},
{
"Name": "suffix",
"Value": "png"
}
]
}
}
}
]
})
# Verify to completion:
result = s3.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["TopicConfigurations"]) == 2
assert not result.get("QueueConfigurations")
assert not result.get("LambdaFunctionConfigurations")
assert result["TopicConfigurations"][0]["TopicArn"] == "arn:aws:sns:us-east-1:012345678910:mytopic"
assert result["TopicConfigurations"][1]["TopicArn"] == "arn:aws:sns:us-east-1:012345678910:myothertopic"
assert len(result["TopicConfigurations"][0]["Events"]) == 2
assert len(result["TopicConfigurations"][1]["Events"]) == 1
assert result["TopicConfigurations"][0]["Events"][0] == "s3:ObjectCreated:*"
assert result["TopicConfigurations"][0]["Events"][1] == "s3:ObjectRemoved:*"
assert result["TopicConfigurations"][1]["Events"][0] == "s3:ObjectCreated:*"
assert result["TopicConfigurations"][0]["Id"]
assert result["TopicConfigurations"][1]["Id"]
assert not result["TopicConfigurations"][0].get("Filter")
assert len(result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"]) == 2
assert result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][0]["Name"] == "prefix"
assert result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][0]["Value"] == "images/"
assert result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][1]["Name"] == "suffix"
assert result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][1]["Value"] == "png"
# Place proper queue configuration:
s3.put_bucket_notification_configuration(Bucket="bucket",
NotificationConfiguration={
"QueueConfigurations": [
{
"Id": "SomeID",
"QueueArn": "arn:aws:sqs:us-east-1:012345678910:myQueue",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "images/"
}
]
}
}
}
]
})
result = s3.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["QueueConfigurations"]) == 1
assert not result.get("TopicConfigurations")
assert not result.get("LambdaFunctionConfigurations")
assert result["QueueConfigurations"][0]["Id"] == "SomeID"
assert result["QueueConfigurations"][0]["QueueArn"] == "arn:aws:sqs:us-east-1:012345678910:myQueue"
assert result["QueueConfigurations"][0]["Events"][0] == "s3:ObjectCreated:*"
assert len(result["QueueConfigurations"][0]["Events"]) == 1
assert len(result["QueueConfigurations"][0]["Filter"]["Key"]["FilterRules"]) == 1
assert result["QueueConfigurations"][0]["Filter"]["Key"]["FilterRules"][0]["Name"] == "prefix"
assert result["QueueConfigurations"][0]["Filter"]["Key"]["FilterRules"][0]["Value"] == "images/"
# Place proper Lambda configuration:
s3.put_bucket_notification_configuration(Bucket="bucket",
NotificationConfiguration={
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn":
"arn:aws:lambda:us-east-1:012345678910:function:lambda",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{
"Name": "prefix",
"Value": "images/"
}
]
}
}
}
]
})
result = s3.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["LambdaFunctionConfigurations"]) == 1
assert not result.get("TopicConfigurations")
assert not result.get("QueueConfigurations")
assert result["LambdaFunctionConfigurations"][0]["Id"]
assert result["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"] == \
"arn:aws:lambda:us-east-1:012345678910:function:lambda"
assert result["LambdaFunctionConfigurations"][0]["Events"][0] == "s3:ObjectCreated:*"
assert len(result["LambdaFunctionConfigurations"][0]["Events"]) == 1
assert len(result["LambdaFunctionConfigurations"][0]["Filter"]["Key"]["FilterRules"]) == 1
assert result["LambdaFunctionConfigurations"][0]["Filter"]["Key"]["FilterRules"][0]["Name"] == "prefix"
assert result["LambdaFunctionConfigurations"][0]["Filter"]["Key"]["FilterRules"][0]["Value"] == "images/"
# And with all 3 set:
s3.put_bucket_notification_configuration(Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"TopicArn": "arn:aws:sns:us-east-1:012345678910:mytopic",
"Events": [
"s3:ObjectCreated:*",
"s3:ObjectRemoved:*"
]
}
],
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn":
"arn:aws:lambda:us-east-1:012345678910:function:lambda",
"Events": ["s3:ObjectCreated:*"]
}
],
"QueueConfigurations": [
{
"QueueArn": "arn:aws:sqs:us-east-1:012345678910:myQueue",
"Events": ["s3:ObjectCreated:*"]
}
]
})
result = s3.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["LambdaFunctionConfigurations"]) == 1
assert len(result["TopicConfigurations"]) == 1
assert len(result["QueueConfigurations"]) == 1
# And clear it out:
s3.put_bucket_notification_configuration(Bucket="bucket", NotificationConfiguration={})
result = s3.get_bucket_notification_configuration(Bucket="bucket")
assert not result.get("TopicConfigurations")
assert not result.get("QueueConfigurations")
assert not result.get("LambdaFunctionConfigurations")
@mock_s3
def test_put_bucket_notification_errors():
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="bucket")
# With incorrect ARNs:
for tech, arn in [("Queue", "sqs"), ("Topic", "sns"), ("LambdaFunction", "lambda")]:
with assert_raises(ClientError) as err:
s3.put_bucket_notification_configuration(Bucket="bucket",
NotificationConfiguration={
"{}Configurations".format(tech): [
{
"{}Arn".format(tech):
"arn:aws:{}:us-east-1:012345678910:lksajdfkldskfj",
"Events": ["s3:ObjectCreated:*"]
}
]
})
assert err.exception.response["Error"]["Code"] == "InvalidArgument"
assert err.exception.response["Error"]["Message"] == "The ARN is not well formed"
# Region not the same as the bucket:
with assert_raises(ClientError) as err:
s3.put_bucket_notification_configuration(Bucket="bucket",
NotificationConfiguration={
"QueueConfigurations": [
{
"QueueArn":
"arn:aws:sqs:us-west-2:012345678910:lksajdfkldskfj",
"Events": ["s3:ObjectCreated:*"]
}
]
})
assert err.exception.response["Error"]["Code"] == "InvalidArgument"
assert err.exception.response["Error"]["Message"] == \
"The notification destination service region is not valid for the bucket location constraint"
# Invalid event name:
with assert_raises(ClientError) as err:
s3.put_bucket_notification_configuration(Bucket="bucket",
NotificationConfiguration={
"QueueConfigurations": [
{
"QueueArn":
"arn:aws:sqs:us-east-1:012345678910:lksajdfkldskfj",
"Events": ["notarealeventname"]
}
]
})
assert err.exception.response["Error"]["Code"] == "InvalidArgument"
assert err.exception.response["Error"]["Message"] == "The event is not supported for notifications"
@mock_s3
def test_boto3_put_bucket_logging():
s3 = boto3.client("s3", region_name="us-east-1")
@ -1953,7 +2180,7 @@ def test_boto3_put_bucket_logging():
result = s3.get_bucket_logging(Bucket=bucket_name)
assert len(result["LoggingEnabled"]["TargetGrants"]) == 2
assert result["LoggingEnabled"]["TargetGrants"][0]["Grantee"]["ID"] == \
"SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274"
"SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274"
# Test with just 1 grant:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={