Fixed S3 lifecycle error message. (#1110)

Fixes #1109
Also added PutBucketTagging support
Also added Bucket CORS support
This commit is contained in:
Mike Grima 2017-09-07 11:30:05 -07:00 committed by Jack Danger
parent b922af8ab7
commit 5d8cd22b01
4 changed files with 523 additions and 1 deletions

View File

@ -91,3 +91,23 @@ class EntityTooSmall(S3ClientError):
"EntityTooSmall",
"Your proposed upload is smaller than the minimum allowed object size.",
*args, **kwargs)
class InvalidRequest(S3ClientError):
code = 400
def __init__(self, method, *args, **kwargs):
super(InvalidRequest, self).__init__(
"InvalidRequest",
"Found unsupported HTTP method in CORS config. Unsupported method is {}".format(method),
*args, **kwargs)
class MalformedXML(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(MalformedXML, self).__init__(
"MalformedXML",
"The XML you provided was not well-formed or did not validate against our published schema",
*args, **kwargs)

View File

@ -295,6 +295,26 @@ class LifecycleRule(BaseModel):
self.storage_class = storage_class
class CorsRule(BaseModel):
def __init__(self, allowed_methods, allowed_origins, allowed_headers=None, expose_headers=None,
max_age_seconds=None):
# Python 2 and 3 have different string types for handling unicodes. Python 2 wants `basestring`,
# whereas Python 3 is OK with str. This causes issues with the XML parser, which returns
# unicode strings in Python 2. So, need to do this to make it work in both Python 2 and 3:
import sys
if sys.version_info >= (3, 0):
str_type = str
else:
str_type = basestring # noqa
self.allowed_methods = [allowed_methods] if isinstance(allowed_methods, str_type) else allowed_methods
self.allowed_origins = [allowed_origins] if isinstance(allowed_origins, str_type) else allowed_origins
self.allowed_headers = [allowed_headers] if isinstance(allowed_headers, str_type) else allowed_headers
self.exposed_headers = [expose_headers] if isinstance(expose_headers, str_type) else expose_headers
self.max_age_seconds = max_age_seconds
class FakeBucket(BaseModel):
def __init__(self, name, region_name):
@ -307,6 +327,8 @@ class FakeBucket(BaseModel):
self.policy = None
self.website_configuration = None
self.acl = get_canned_acl('private')
self.tags = FakeTagging()
self.cors = []
@property
def location(self):
@ -336,6 +358,61 @@ class FakeBucket(BaseModel):
def delete_lifecycle(self):
self.rules = []
def set_cors(self, rules):
from moto.s3.exceptions import InvalidRequest, MalformedXML
self.cors = []
if len(rules) > 100:
raise MalformedXML()
# Python 2 and 3 have different string types for handling unicodes. Python 2 wants `basestring`,
# whereas Python 3 is OK with str. This causes issues with the XML parser, which returns
# unicode strings in Python 2. So, need to do this to make it work in both Python 2 and 3:
import sys
if sys.version_info >= (3, 0):
str_type = str
else:
str_type = basestring # noqa
for rule in rules:
assert isinstance(rule["AllowedMethod"], list) or isinstance(rule["AllowedMethod"], str_type)
assert isinstance(rule["AllowedOrigin"], list) or isinstance(rule["AllowedOrigin"], str_type)
assert isinstance(rule.get("AllowedHeader", []), list) or isinstance(rule.get("AllowedHeader", ""),
str_type)
assert isinstance(rule.get("ExposedHeader", []), list) or isinstance(rule.get("ExposedHeader", ""),
str_type)
assert isinstance(rule.get("MaxAgeSeconds", "0"), str_type)
if isinstance(rule["AllowedMethod"], str_type):
methods = [rule["AllowedMethod"]]
else:
methods = rule["AllowedMethod"]
for method in methods:
if method not in ["GET", "PUT", "HEAD", "POST", "DELETE"]:
raise InvalidRequest(method)
self.cors.append(CorsRule(
rule["AllowedMethod"],
rule["AllowedOrigin"],
rule.get("AllowedHeader"),
rule.get("ExposedHeader"),
rule.get("MaxAgeSecond")
))
def delete_cors(self):
self.cors = []
def set_tags(self, tagging):
self.tags = tagging
def delete_tags(self):
self.tags = FakeTagging()
@property
def tagging(self):
return self.tags
def set_website_configuration(self, website_configuration):
self.website_configuration = website_configuration
@ -510,6 +587,22 @@ class S3Backend(BaseBackend):
key.set_tagging(tagging)
return key
def put_bucket_tagging(self, bucket_name, tagging):
bucket = self.get_bucket(bucket_name)
bucket.set_tags(tagging)
def delete_bucket_tagging(self, bucket_name):
bucket = self.get_bucket(bucket_name)
bucket.delete_tags()
def put_bucket_cors(self, bucket_name, cors_rules):
bucket = self.get_bucket(bucket_name)
bucket.set_cors(cors_rules)
def delete_bucket_cors(self, bucket_name):
bucket = self.get_bucket(bucket_name)
bucket.delete_cors()
def initiate_multipart(self, bucket_name, key_name, metadata):
bucket = self.get_bucket(bucket_name)
new_multipart = FakeMultipart(key_name, metadata)

View File

@ -188,7 +188,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
elif 'lifecycle' in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.rules:
return 404, {}, "NoSuchLifecycleConfiguration"
template = self.response_template(S3_NO_LIFECYCLE)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(
S3_BUCKET_LIFECYCLE_CONFIGURATION)
return template.render(rules=bucket.rules)
@ -205,11 +206,29 @@ class ResponseObject(_TemplateEnvironmentMixin):
elif 'website' in querystring:
website_configuration = self.backend.get_bucket_website_configuration(
bucket_name)
if not website_configuration:
template = self.response_template(S3_NO_BUCKET_WEBSITE_CONFIG)
return 404, {}, template.render(bucket_name=bucket_name)
return website_configuration
elif 'acl' in querystring:
bucket = self.backend.get_bucket(bucket_name)
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return template.render(obj=bucket)
elif 'tagging' in querystring:
bucket = self.backend.get_bucket(bucket_name)
# "Special Error" if no tags:
if len(bucket.tagging.tag_set.tags) == 0:
template = self.response_template(S3_NO_BUCKET_TAGGING)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_TAGGING_RESPONSE)
return template.render(bucket=bucket)
elif "cors" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if len(bucket.cors) == 0:
template = self.response_template(S3_NO_CORS_CONFIG)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_CORS_RESPONSE)
return template.render(bucket=bucket)
elif 'versions' in querystring:
delimiter = querystring.get('delimiter', [None])[0]
encoding_type = querystring.get('encoding-type', [None])[0]
@ -340,9 +359,20 @@ class ResponseObject(_TemplateEnvironmentMixin):
# TODO: Support the XML-based ACL format
self.backend.set_bucket_acl(bucket_name, acl)
return ""
elif "tagging" in querystring:
tagging = self._bucket_tagging_from_xml(body)
self.backend.put_bucket_tagging(bucket_name, tagging)
return ""
elif 'website' in querystring:
self.backend.set_bucket_website_configuration(bucket_name, body)
return ""
elif "cors" in querystring:
from moto.s3.exceptions import MalformedXML
try:
self.backend.put_bucket_cors(bucket_name, self._cors_from_xml(body))
return ""
except KeyError:
raise MalformedXML()
else:
if body:
try:
@ -366,6 +396,12 @@ class ResponseObject(_TemplateEnvironmentMixin):
if 'policy' in querystring:
self.backend.delete_bucket_policy(bucket_name, body)
return 204, {}, ""
elif "tagging" in querystring:
self.backend.delete_bucket_tagging(bucket_name)
return 204, {}, ""
elif "cors" in querystring:
self.backend.delete_bucket_cors(bucket_name)
return 204, {}, ""
elif 'lifecycle' in querystring:
bucket = self.backend.get_bucket(bucket_name)
bucket.delete_lifecycle()
@ -697,6 +733,27 @@ class ResponseObject(_TemplateEnvironmentMixin):
tagging = FakeTagging(tag_set)
return tagging
def _bucket_tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
tags = []
# Optional if no tags are being sent:
if parsed_xml['Tagging'].get('TagSet'):
for tag in parsed_xml['Tagging']['TagSet']['Tag']:
tags.append(FakeTag(tag['Key'], tag['Value']))
tag_set = FakeTagSet(tags)
tagging = FakeTagging(tag_set)
return tagging
def _cors_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
if isinstance(parsed_xml["CORSConfiguration"]["CORSRule"], list):
return [cors for cors in parsed_xml["CORSConfiguration"]["CORSRule"]]
return [parsed_xml["CORSConfiguration"]["CORSRule"]]
def _key_response_delete(self, bucket_name, query, key_name, headers):
if query.get('uploadId'):
upload_id = query['uploadId'][0]
@ -1023,6 +1080,46 @@ S3_OBJECT_TAGGING_RESPONSE = """\
</TagSet>
</Tagging>"""
S3_BUCKET_TAGGING_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<Tagging>
<TagSet>
{% for tag in bucket.tagging.tag_set.tags %}
<Tag>
<Key>{{ tag.key }}</Key>
<Value>{{ tag.value }}</Value>
</Tag>
{% endfor %}
</TagSet>
</Tagging>"""
S3_BUCKET_CORS_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<CORSConfiguration>
{% for cors in bucket.cors %}
<CORSRule>
{% for origin in cors.allowed_origins %}
<AllowedOrigin>{{ origin }}</AllowedOrigin>
{% endfor %}
{% for method in cors.allowed_methods %}
<AllowedMethod>{{ method }}</AllowedMethod>
{% endfor %}
{% if cors.allowed_headers is not none %}
{% for header in cors.allowed_headers %}
<AllowedHeader>{{ header }}</AllowedHeader>
{% endfor %}
{% endif %}
{% if cors.exposed_headers is not none %}
{% for header in cors.exposed_headers %}
<ExposedHeader>{{ header }}</ExposedHeader>
{% endfor %}
{% endif %}
{% if cors.max_age_seconds is not none %}
<MaxAgeSeconds>{{ cors.max_age_seconds }}</MaxAgeSeconds>
{% endif %}
</CORSRule>
{% endfor %}
</CORSConfiguration>
"""
S3_OBJECT_COPY_RESPONSE = """\
<CopyObjectResult xmlns="http://doc.s3.amazonaws.com/2006-03-01">
<ETag>{{ key.etag }}</ETag>
@ -1115,3 +1212,53 @@ S3_NO_POLICY = """<?xml version="1.0" encoding="UTF-8"?>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error>
"""
S3_NO_LIFECYCLE = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchLifecycleConfiguration</Code>
<Message>The lifecycle configuration does not exist</Message>
<BucketName>{{ bucket_name }}</BucketName>
<RequestId>44425877V1D0A2F9</RequestId>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error>
"""
S3_NO_BUCKET_TAGGING = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchTagSet</Code>
<Message>The TagSet does not exist</Message>
<BucketName>{{ bucket_name }}</BucketName>
<RequestId>44425877V1D0A2F9</RequestId>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error>
"""
S3_NO_BUCKET_WEBSITE_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchWebsiteConfiguration</Code>
<Message>The specified bucket does not have a website configuration</Message>
<BucketName>{{ bucket_name }}</BucketName>
<RequestId>44425877V1D0A2F9</RequestId>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error>
"""
S3_INVALID_CORS_REQUEST = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchWebsiteConfiguration</Code>
<Message>The specified bucket does not have a website configuration</Message>
<BucketName>{{ bucket_name }}</BucketName>
<RequestId>44425877V1D0A2F9</RequestId>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error>
"""
S3_NO_CORS_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NoSuchCORSConfiguration</Code>
<Message>The CORS configuration does not exist</Message>
<BucketName>{{ bucket_name }}</BucketName>
<RequestId>44425877V1D0A2F9</RequestId>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error>
"""

View File

@ -1391,6 +1391,268 @@ def test_boto3_put_object_with_tagging():
resp['TagSet'].should.contain({'Key': 'foo', 'Value': 'bar'})
@mock_s3
def test_boto3_put_bucket_tagging():
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = "mybucket"
s3.create_bucket(Bucket=bucket_name)
resp = s3.put_bucket_tagging(Bucket=bucket_name,
Tagging={
"TagSet": [
{
"Key": "TagOne",
"Value": "ValueOne"
},
{
"Key": "TagTwo",
"Value": "ValueTwo"
}
]
})
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
# No tags is also OK:
resp = s3.put_bucket_tagging(Bucket=bucket_name, Tagging={
"TagSet": []
})
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
@mock_s3
def test_boto3_get_bucket_tagging():
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = "mybucket"
s3.create_bucket(Bucket=bucket_name)
s3.put_bucket_tagging(Bucket=bucket_name,
Tagging={
"TagSet": [
{
"Key": "TagOne",
"Value": "ValueOne"
},
{
"Key": "TagTwo",
"Value": "ValueTwo"
}
]
})
# Get the tags for the bucket:
resp = s3.get_bucket_tagging(Bucket=bucket_name)
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
len(resp["TagSet"]).should.equal(2)
# With no tags:
s3.put_bucket_tagging(Bucket=bucket_name, Tagging={
"TagSet": []
})
with assert_raises(ClientError) as err:
s3.get_bucket_tagging(Bucket=bucket_name)
e = err.exception
e.response["Error"]["Code"].should.equal("NoSuchTagSet")
e.response["Error"]["Message"].should.equal("The TagSet does not exist")
@mock_s3
def test_boto3_delete_bucket_tagging():
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = "mybucket"
s3.create_bucket(Bucket=bucket_name)
s3.put_bucket_tagging(Bucket=bucket_name,
Tagging={
"TagSet": [
{
"Key": "TagOne",
"Value": "ValueOne"
},
{
"Key": "TagTwo",
"Value": "ValueTwo"
}
]
})
resp = s3.delete_bucket_tagging(Bucket=bucket_name)
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(204)
with assert_raises(ClientError) as err:
s3.get_bucket_tagging(Bucket=bucket_name)
e = err.exception
e.response["Error"]["Code"].should.equal("NoSuchTagSet")
e.response["Error"]["Message"].should.equal("The TagSet does not exist")
@mock_s3
def test_boto3_put_bucket_cors():
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = "mybucket"
s3.create_bucket(Bucket=bucket_name)
resp = s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
"CORSRules": [
{
"AllowedOrigins": [
"*"
],
"AllowedMethods": [
"GET",
"POST"
],
"AllowedHeaders": [
"Authorization"
],
"ExposeHeaders": [
"x-amz-request-id"
],
"MaxAgeSeconds": 123
},
{
"AllowedOrigins": [
"*"
],
"AllowedMethods": [
"PUT"
],
"AllowedHeaders": [
"Authorization"
],
"ExposeHeaders": [
"x-amz-request-id"
],
"MaxAgeSeconds": 123
}
]
})
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
with assert_raises(ClientError) as err:
s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
"CORSRules": [
{
"AllowedOrigins": [
"*"
],
"AllowedMethods": [
"NOTREAL",
"POST"
]
}
]
})
e = err.exception
e.response["Error"]["Code"].should.equal("InvalidRequest")
e.response["Error"]["Message"].should.equal("Found unsupported HTTP method in CORS config. "
"Unsupported method is NOTREAL")
with assert_raises(ClientError) as err:
s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
"CORSRules": []
})
e = err.exception
e.response["Error"]["Code"].should.equal("MalformedXML")
# And 101:
many_rules = [{"AllowedOrigins": ["*"], "AllowedMethods": ["GET"]}] * 101
with assert_raises(ClientError) as err:
s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
"CORSRules": many_rules
})
e = err.exception
e.response["Error"]["Code"].should.equal("MalformedXML")
@mock_s3
def test_boto3_get_bucket_cors():
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = "mybucket"
s3.create_bucket(Bucket=bucket_name)
# Without CORS:
with assert_raises(ClientError) as err:
s3.get_bucket_cors(Bucket=bucket_name)
e = err.exception
e.response["Error"]["Code"].should.equal("NoSuchCORSConfiguration")
e.response["Error"]["Message"].should.equal("The CORS configuration does not exist")
s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
"CORSRules": [
{
"AllowedOrigins": [
"*"
],
"AllowedMethods": [
"GET",
"POST"
],
"AllowedHeaders": [
"Authorization"
],
"ExposeHeaders": [
"x-amz-request-id"
],
"MaxAgeSeconds": 123
},
{
"AllowedOrigins": [
"*"
],
"AllowedMethods": [
"PUT"
],
"AllowedHeaders": [
"Authorization"
],
"ExposeHeaders": [
"x-amz-request-id"
],
"MaxAgeSeconds": 123
}
]
})
resp = s3.get_bucket_cors(Bucket=bucket_name)
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
len(resp["CORSRules"]).should.equal(2)
@mock_s3
def test_boto3_delete_bucket_cors():
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = "mybucket"
s3.create_bucket(Bucket=bucket_name)
s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
"CORSRules": [
{
"AllowedOrigins": [
"*"
],
"AllowedMethods": [
"GET"
]
}
]
})
resp = s3.delete_bucket_cors(Bucket=bucket_name)
resp['ResponseMetadata']['HTTPStatusCode'].should.equal(204)
# Verify deletion:
with assert_raises(ClientError) as err:
s3.get_bucket_cors(Bucket=bucket_name)
e = err.exception
e.response["Error"]["Code"].should.equal("NoSuchCORSConfiguration")
e.response["Error"]["Message"].should.equal("The CORS configuration does not exist")
@mock_s3
def test_boto3_put_object_tagging():
s3 = boto3.client('s3', region_name='us-east-1')