diff --git a/moto/s3/exceptions.py b/moto/s3/exceptions.py
index df817ba78..24704e7ef 100644
--- a/moto/s3/exceptions.py
+++ b/moto/s3/exceptions.py
@@ -91,3 +91,23 @@ class EntityTooSmall(S3ClientError):
"EntityTooSmall",
"Your proposed upload is smaller than the minimum allowed object size.",
*args, **kwargs)
+
+
+class InvalidRequest(S3ClientError):
+ code = 400
+
+ def __init__(self, method, *args, **kwargs):
+ super(InvalidRequest, self).__init__(
+ "InvalidRequest",
+ "Found unsupported HTTP method in CORS config. Unsupported method is {}".format(method),
+ *args, **kwargs)
+
+
+class MalformedXML(S3ClientError):
+ code = 400
+
+ def __init__(self, *args, **kwargs):
+ super(MalformedXML, self).__init__(
+ "MalformedXML",
+ "The XML you provided was not well-formed or did not validate against our published schema",
+ *args, **kwargs)
diff --git a/moto/s3/models.py b/moto/s3/models.py
index 4ea33adb6..abe92bdf1 100644
--- a/moto/s3/models.py
+++ b/moto/s3/models.py
@@ -295,6 +295,26 @@ class LifecycleRule(BaseModel):
self.storage_class = storage_class
+class CorsRule(BaseModel):
+
+ def __init__(self, allowed_methods, allowed_origins, allowed_headers=None, expose_headers=None,
+ max_age_seconds=None):
+ # Python 2 and 3 have different string types for handling unicodes. Python 2 wants `basestring`,
+ # whereas Python 3 is OK with str. This causes issues with the XML parser, which returns
+ # unicode strings in Python 2. So, need to do this to make it work in both Python 2 and 3:
+ import sys
+ if sys.version_info >= (3, 0):
+ str_type = str
+ else:
+ str_type = basestring # noqa
+
+ self.allowed_methods = [allowed_methods] if isinstance(allowed_methods, str_type) else allowed_methods
+ self.allowed_origins = [allowed_origins] if isinstance(allowed_origins, str_type) else allowed_origins
+ self.allowed_headers = [allowed_headers] if isinstance(allowed_headers, str_type) else allowed_headers
+ self.exposed_headers = [expose_headers] if isinstance(expose_headers, str_type) else expose_headers
+ self.max_age_seconds = max_age_seconds
+
+
class FakeBucket(BaseModel):
def __init__(self, name, region_name):
@@ -307,6 +327,8 @@ class FakeBucket(BaseModel):
self.policy = None
self.website_configuration = None
self.acl = get_canned_acl('private')
+ self.tags = FakeTagging()
+ self.cors = []
@property
def location(self):
@@ -336,6 +358,61 @@ class FakeBucket(BaseModel):
def delete_lifecycle(self):
self.rules = []
+ def set_cors(self, rules):
+ from moto.s3.exceptions import InvalidRequest, MalformedXML
+ self.cors = []
+
+ if len(rules) > 100:
+ raise MalformedXML()
+
+ # Python 2 and 3 have different string types for handling unicodes. Python 2 wants `basestring`,
+ # whereas Python 3 is OK with str. This causes issues with the XML parser, which returns
+ # unicode strings in Python 2. So, need to do this to make it work in both Python 2 and 3:
+ import sys
+ if sys.version_info >= (3, 0):
+ str_type = str
+ else:
+ str_type = basestring # noqa
+
+ for rule in rules:
+ assert isinstance(rule["AllowedMethod"], list) or isinstance(rule["AllowedMethod"], str_type)
+ assert isinstance(rule["AllowedOrigin"], list) or isinstance(rule["AllowedOrigin"], str_type)
+ assert isinstance(rule.get("AllowedHeader", []), list) or isinstance(rule.get("AllowedHeader", ""),
+ str_type)
+ assert isinstance(rule.get("ExposedHeader", []), list) or isinstance(rule.get("ExposedHeader", ""),
+ str_type)
+ assert isinstance(rule.get("MaxAgeSeconds", "0"), str_type)
+
+ if isinstance(rule["AllowedMethod"], str_type):
+ methods = [rule["AllowedMethod"]]
+ else:
+ methods = rule["AllowedMethod"]
+
+ for method in methods:
+ if method not in ["GET", "PUT", "HEAD", "POST", "DELETE"]:
+ raise InvalidRequest(method)
+
+ self.cors.append(CorsRule(
+ rule["AllowedMethod"],
+ rule["AllowedOrigin"],
+ rule.get("AllowedHeader"),
+ rule.get("ExposedHeader"),
+ rule.get("MaxAgeSecond")
+ ))
+
+ def delete_cors(self):
+ self.cors = []
+
+ def set_tags(self, tagging):
+ self.tags = tagging
+
+ def delete_tags(self):
+ self.tags = FakeTagging()
+
+ @property
+ def tagging(self):
+ return self.tags
+
def set_website_configuration(self, website_configuration):
self.website_configuration = website_configuration
@@ -510,6 +587,22 @@ class S3Backend(BaseBackend):
key.set_tagging(tagging)
return key
+ def put_bucket_tagging(self, bucket_name, tagging):
+ bucket = self.get_bucket(bucket_name)
+ bucket.set_tags(tagging)
+
+ def delete_bucket_tagging(self, bucket_name):
+ bucket = self.get_bucket(bucket_name)
+ bucket.delete_tags()
+
+ def put_bucket_cors(self, bucket_name, cors_rules):
+ bucket = self.get_bucket(bucket_name)
+ bucket.set_cors(cors_rules)
+
+ def delete_bucket_cors(self, bucket_name):
+ bucket = self.get_bucket(bucket_name)
+ bucket.delete_cors()
+
def initiate_multipart(self, bucket_name, key_name, metadata):
bucket = self.get_bucket(bucket_name)
new_multipart = FakeMultipart(key_name, metadata)
diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index dea80518d..2a696e551 100644
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -188,7 +188,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
elif 'lifecycle' in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.rules:
- return 404, {}, "NoSuchLifecycleConfiguration"
+ template = self.response_template(S3_NO_LIFECYCLE)
+ return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(
S3_BUCKET_LIFECYCLE_CONFIGURATION)
return template.render(rules=bucket.rules)
@@ -205,11 +206,29 @@ class ResponseObject(_TemplateEnvironmentMixin):
elif 'website' in querystring:
website_configuration = self.backend.get_bucket_website_configuration(
bucket_name)
+ if not website_configuration:
+ template = self.response_template(S3_NO_BUCKET_WEBSITE_CONFIG)
+ return 404, {}, template.render(bucket_name=bucket_name)
return website_configuration
elif 'acl' in querystring:
bucket = self.backend.get_bucket(bucket_name)
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return template.render(obj=bucket)
+ elif 'tagging' in querystring:
+ bucket = self.backend.get_bucket(bucket_name)
+ # "Special Error" if no tags:
+ if len(bucket.tagging.tag_set.tags) == 0:
+ template = self.response_template(S3_NO_BUCKET_TAGGING)
+ return 404, {}, template.render(bucket_name=bucket_name)
+ template = self.response_template(S3_BUCKET_TAGGING_RESPONSE)
+ return template.render(bucket=bucket)
+ elif "cors" in querystring:
+ bucket = self.backend.get_bucket(bucket_name)
+ if len(bucket.cors) == 0:
+ template = self.response_template(S3_NO_CORS_CONFIG)
+ return 404, {}, template.render(bucket_name=bucket_name)
+ template = self.response_template(S3_BUCKET_CORS_RESPONSE)
+ return template.render(bucket=bucket)
elif 'versions' in querystring:
delimiter = querystring.get('delimiter', [None])[0]
encoding_type = querystring.get('encoding-type', [None])[0]
@@ -340,9 +359,20 @@ class ResponseObject(_TemplateEnvironmentMixin):
# TODO: Support the XML-based ACL format
self.backend.set_bucket_acl(bucket_name, acl)
return ""
+ elif "tagging" in querystring:
+ tagging = self._bucket_tagging_from_xml(body)
+ self.backend.put_bucket_tagging(bucket_name, tagging)
+ return ""
elif 'website' in querystring:
self.backend.set_bucket_website_configuration(bucket_name, body)
return ""
+ elif "cors" in querystring:
+ from moto.s3.exceptions import MalformedXML
+ try:
+ self.backend.put_bucket_cors(bucket_name, self._cors_from_xml(body))
+ return ""
+ except KeyError:
+ raise MalformedXML()
else:
if body:
try:
@@ -366,6 +396,12 @@ class ResponseObject(_TemplateEnvironmentMixin):
if 'policy' in querystring:
self.backend.delete_bucket_policy(bucket_name, body)
return 204, {}, ""
+ elif "tagging" in querystring:
+ self.backend.delete_bucket_tagging(bucket_name)
+ return 204, {}, ""
+ elif "cors" in querystring:
+ self.backend.delete_bucket_cors(bucket_name)
+ return 204, {}, ""
elif 'lifecycle' in querystring:
bucket = self.backend.get_bucket(bucket_name)
bucket.delete_lifecycle()
@@ -697,6 +733,27 @@ class ResponseObject(_TemplateEnvironmentMixin):
tagging = FakeTagging(tag_set)
return tagging
+ def _bucket_tagging_from_xml(self, xml):
+ parsed_xml = xmltodict.parse(xml)
+
+ tags = []
+ # Optional if no tags are being sent:
+ if parsed_xml['Tagging'].get('TagSet'):
+ for tag in parsed_xml['Tagging']['TagSet']['Tag']:
+ tags.append(FakeTag(tag['Key'], tag['Value']))
+
+ tag_set = FakeTagSet(tags)
+ tagging = FakeTagging(tag_set)
+ return tagging
+
+ def _cors_from_xml(self, xml):
+ parsed_xml = xmltodict.parse(xml)
+
+ if isinstance(parsed_xml["CORSConfiguration"]["CORSRule"], list):
+ return [cors for cors in parsed_xml["CORSConfiguration"]["CORSRule"]]
+
+ return [parsed_xml["CORSConfiguration"]["CORSRule"]]
+
def _key_response_delete(self, bucket_name, query, key_name, headers):
if query.get('uploadId'):
upload_id = query['uploadId'][0]
@@ -1023,6 +1080,46 @@ S3_OBJECT_TAGGING_RESPONSE = """\
"""
+S3_BUCKET_TAGGING_RESPONSE = """
+
+
+ {% for tag in bucket.tagging.tag_set.tags %}
+
+ {{ tag.key }}
+ {{ tag.value }}
+
+ {% endfor %}
+
+"""
+
+S3_BUCKET_CORS_RESPONSE = """
+
+ {% for cors in bucket.cors %}
+
+ {% for origin in cors.allowed_origins %}
+ {{ origin }}
+ {% endfor %}
+ {% for method in cors.allowed_methods %}
+ {{ method }}
+ {% endfor %}
+ {% if cors.allowed_headers is not none %}
+ {% for header in cors.allowed_headers %}
+ {{ header }}
+ {% endfor %}
+ {% endif %}
+ {% if cors.exposed_headers is not none %}
+ {% for header in cors.exposed_headers %}
+ {{ header }}
+ {% endfor %}
+ {% endif %}
+ {% if cors.max_age_seconds is not none %}
+ {{ cors.max_age_seconds }}
+ {% endif %}
+
+ {% endfor %}
+
+"""
+
S3_OBJECT_COPY_RESPONSE = """\
{{ key.etag }}
@@ -1115,3 +1212,53 @@ S3_NO_POLICY = """
9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
"""
+
+S3_NO_LIFECYCLE = """
+
+ NoSuchLifecycleConfiguration
+ The lifecycle configuration does not exist
+ {{ bucket_name }}
+ 44425877V1D0A2F9
+ 9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
+
+"""
+
+S3_NO_BUCKET_TAGGING = """
+
+ NoSuchTagSet
+ The TagSet does not exist
+ {{ bucket_name }}
+ 44425877V1D0A2F9
+ 9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
+
+"""
+
+S3_NO_BUCKET_WEBSITE_CONFIG = """
+
+ NoSuchWebsiteConfiguration
+ The specified bucket does not have a website configuration
+ {{ bucket_name }}
+ 44425877V1D0A2F9
+ 9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
+
+"""
+
+S3_INVALID_CORS_REQUEST = """
+
+ NoSuchWebsiteConfiguration
+ The specified bucket does not have a website configuration
+ {{ bucket_name }}
+ 44425877V1D0A2F9
+ 9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
+
+"""
+
+S3_NO_CORS_CONFIG = """
+
+ NoSuchCORSConfiguration
+ The CORS configuration does not exist
+ {{ bucket_name }}
+ 44425877V1D0A2F9
+ 9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
+
+"""
diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py
index 3832026eb..331452a7d 100644
--- a/tests/test_s3/test_s3.py
+++ b/tests/test_s3/test_s3.py
@@ -1391,6 +1391,268 @@ def test_boto3_put_object_with_tagging():
resp['TagSet'].should.contain({'Key': 'foo', 'Value': 'bar'})
+@mock_s3
+def test_boto3_put_bucket_tagging():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ bucket_name = "mybucket"
+ s3.create_bucket(Bucket=bucket_name)
+
+ resp = s3.put_bucket_tagging(Bucket=bucket_name,
+ Tagging={
+ "TagSet": [
+ {
+ "Key": "TagOne",
+ "Value": "ValueOne"
+ },
+ {
+ "Key": "TagTwo",
+ "Value": "ValueTwo"
+ }
+ ]
+ })
+
+ resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
+
+ # No tags is also OK:
+ resp = s3.put_bucket_tagging(Bucket=bucket_name, Tagging={
+ "TagSet": []
+ })
+ resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
+
+
+@mock_s3
+def test_boto3_get_bucket_tagging():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ bucket_name = "mybucket"
+ s3.create_bucket(Bucket=bucket_name)
+ s3.put_bucket_tagging(Bucket=bucket_name,
+ Tagging={
+ "TagSet": [
+ {
+ "Key": "TagOne",
+ "Value": "ValueOne"
+ },
+ {
+ "Key": "TagTwo",
+ "Value": "ValueTwo"
+ }
+ ]
+ })
+
+ # Get the tags for the bucket:
+ resp = s3.get_bucket_tagging(Bucket=bucket_name)
+ resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
+ len(resp["TagSet"]).should.equal(2)
+
+ # With no tags:
+ s3.put_bucket_tagging(Bucket=bucket_name, Tagging={
+ "TagSet": []
+ })
+
+ with assert_raises(ClientError) as err:
+ s3.get_bucket_tagging(Bucket=bucket_name)
+
+ e = err.exception
+ e.response["Error"]["Code"].should.equal("NoSuchTagSet")
+ e.response["Error"]["Message"].should.equal("The TagSet does not exist")
+
+
+@mock_s3
+def test_boto3_delete_bucket_tagging():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ bucket_name = "mybucket"
+ s3.create_bucket(Bucket=bucket_name)
+
+ s3.put_bucket_tagging(Bucket=bucket_name,
+ Tagging={
+ "TagSet": [
+ {
+ "Key": "TagOne",
+ "Value": "ValueOne"
+ },
+ {
+ "Key": "TagTwo",
+ "Value": "ValueTwo"
+ }
+ ]
+ })
+
+ resp = s3.delete_bucket_tagging(Bucket=bucket_name)
+ resp['ResponseMetadata']['HTTPStatusCode'].should.equal(204)
+
+ with assert_raises(ClientError) as err:
+ s3.get_bucket_tagging(Bucket=bucket_name)
+
+ e = err.exception
+ e.response["Error"]["Code"].should.equal("NoSuchTagSet")
+ e.response["Error"]["Message"].should.equal("The TagSet does not exist")
+
+
+@mock_s3
+def test_boto3_put_bucket_cors():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ bucket_name = "mybucket"
+ s3.create_bucket(Bucket=bucket_name)
+
+ resp = s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
+ "CORSRules": [
+ {
+ "AllowedOrigins": [
+ "*"
+ ],
+ "AllowedMethods": [
+ "GET",
+ "POST"
+ ],
+ "AllowedHeaders": [
+ "Authorization"
+ ],
+ "ExposeHeaders": [
+ "x-amz-request-id"
+ ],
+ "MaxAgeSeconds": 123
+ },
+ {
+ "AllowedOrigins": [
+ "*"
+ ],
+ "AllowedMethods": [
+ "PUT"
+ ],
+ "AllowedHeaders": [
+ "Authorization"
+ ],
+ "ExposeHeaders": [
+ "x-amz-request-id"
+ ],
+ "MaxAgeSeconds": 123
+ }
+ ]
+ })
+
+ resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
+
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
+ "CORSRules": [
+ {
+ "AllowedOrigins": [
+ "*"
+ ],
+ "AllowedMethods": [
+ "NOTREAL",
+ "POST"
+ ]
+ }
+ ]
+ })
+ e = err.exception
+ e.response["Error"]["Code"].should.equal("InvalidRequest")
+ e.response["Error"]["Message"].should.equal("Found unsupported HTTP method in CORS config. "
+ "Unsupported method is NOTREAL")
+
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
+ "CORSRules": []
+ })
+ e = err.exception
+ e.response["Error"]["Code"].should.equal("MalformedXML")
+
+ # And 101:
+ many_rules = [{"AllowedOrigins": ["*"], "AllowedMethods": ["GET"]}] * 101
+ with assert_raises(ClientError) as err:
+ s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
+ "CORSRules": many_rules
+ })
+ e = err.exception
+ e.response["Error"]["Code"].should.equal("MalformedXML")
+
+
+@mock_s3
+def test_boto3_get_bucket_cors():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ bucket_name = "mybucket"
+ s3.create_bucket(Bucket=bucket_name)
+
+ # Without CORS:
+ with assert_raises(ClientError) as err:
+ s3.get_bucket_cors(Bucket=bucket_name)
+
+ e = err.exception
+ e.response["Error"]["Code"].should.equal("NoSuchCORSConfiguration")
+ e.response["Error"]["Message"].should.equal("The CORS configuration does not exist")
+
+ s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
+ "CORSRules": [
+ {
+ "AllowedOrigins": [
+ "*"
+ ],
+ "AllowedMethods": [
+ "GET",
+ "POST"
+ ],
+ "AllowedHeaders": [
+ "Authorization"
+ ],
+ "ExposeHeaders": [
+ "x-amz-request-id"
+ ],
+ "MaxAgeSeconds": 123
+ },
+ {
+ "AllowedOrigins": [
+ "*"
+ ],
+ "AllowedMethods": [
+ "PUT"
+ ],
+ "AllowedHeaders": [
+ "Authorization"
+ ],
+ "ExposeHeaders": [
+ "x-amz-request-id"
+ ],
+ "MaxAgeSeconds": 123
+ }
+ ]
+ })
+
+ resp = s3.get_bucket_cors(Bucket=bucket_name)
+ resp['ResponseMetadata']['HTTPStatusCode'].should.equal(200)
+ len(resp["CORSRules"]).should.equal(2)
+
+
+@mock_s3
+def test_boto3_delete_bucket_cors():
+ s3 = boto3.client("s3", region_name="us-east-1")
+ bucket_name = "mybucket"
+ s3.create_bucket(Bucket=bucket_name)
+ s3.put_bucket_cors(Bucket=bucket_name, CORSConfiguration={
+ "CORSRules": [
+ {
+ "AllowedOrigins": [
+ "*"
+ ],
+ "AllowedMethods": [
+ "GET"
+ ]
+ }
+ ]
+ })
+
+ resp = s3.delete_bucket_cors(Bucket=bucket_name)
+ resp['ResponseMetadata']['HTTPStatusCode'].should.equal(204)
+
+ # Verify deletion:
+ with assert_raises(ClientError) as err:
+ s3.get_bucket_cors(Bucket=bucket_name)
+
+ e = err.exception
+ e.response["Error"]["Code"].should.equal("NoSuchCORSConfiguration")
+ e.response["Error"]["Message"].should.equal("The CORS configuration does not exist")
+
+
@mock_s3
def test_boto3_put_object_tagging():
s3 = boto3.client('s3', region_name='us-east-1')