diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index 10e68d569..603571c0d 100644
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -1079,6 +1079,10 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
if key:
if not key.acl.public_read and not signed_url:
return 403, {}, ""
+ elif signed_url:
+ # coming in from requests.get(s3.generate_presigned_url())
+ if self._invalid_headers(request.url, dict(request.headers)):
+ return 403, {}, S3_INVALID_PRESIGNED_PARAMETERS
if hasattr(request, "body"):
# Boto
@@ -1287,6 +1291,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
)
request.streaming = True
metadata = metadata_from_headers(request.headers)
+ metadata.update(metadata_from_headers(query))
new_key.set_metadata(metadata)
new_key.set_acl(acl)
new_key.website_redirect_location = request.headers.get(
@@ -1672,6 +1677,29 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
"Method POST had only been implemented for multipart uploads and restore operations, so far"
)
+ def _invalid_headers(self, url, headers):
+ """
+ Verify whether the provided metadata in the URL is also present in the headers
+ :param url: .../file.txt&content-type=app%2Fjson&Signature=..
+ :param headers: Content-Type=app/json
+ :return: True or False
+ """
+ metadata_to_check = {
+ "content-disposition": "Content-Disposition",
+ "content-encoding": "Content-Encoding",
+ "content-language": "Content-Language",
+ "content-length": "Content-Length",
+ "content-md5": "Content-MD5",
+ "content-type": "Content-Type",
+ }
+ for url_key, header_key in metadata_to_check.items():
+ metadata_in_url = re.search(url_key + "=(.+?)(&.+$|$)", url)
+ if metadata_in_url:
+ url_value = unquote(metadata_in_url.group(1))
+ if header_key not in headers or (url_value != headers[header_key]):
+ return True
+ return False
+
S3ResponseInstance = ResponseObject(s3_backend)
@@ -2214,6 +2242,15 @@ S3_ENCRYPTION_CONFIG = """
"""
+S3_INVALID_PRESIGNED_PARAMETERS = """
+
+ SignatureDoesNotMatch
+ The request signature we calculated does not match the signature you provided. Check your key and signing method.
+ 0D68A23BB2E2215B
+ 9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=
+
+"""
+
S3_NO_ENCRYPTION = """
ServerSideEncryptionConfigurationNotFoundError
diff --git a/moto/s3/utils.py b/moto/s3/utils.py
index 014e98ca9..2cdb7e862 100644
--- a/moto/s3/utils.py
+++ b/moto/s3/utils.py
@@ -6,6 +6,7 @@ from boto.s3.key import Key
import re
import six
from six.moves.urllib.parse import urlparse, unquote, quote
+from requests.structures import CaseInsensitiveDict
import sys
@@ -62,7 +63,7 @@ def parse_region_from_url(url):
def metadata_from_headers(headers):
- metadata = {}
+ metadata = CaseInsensitiveDict()
meta_regex = re.compile(r"^x-amz-meta-([a-zA-Z0-9\-_]+)$", flags=re.IGNORECASE)
for header, value in headers.items():
if isinstance(header, six.string_types):
@@ -75,7 +76,11 @@ def metadata_from_headers(headers):
# Check for special metadata that doesn't start with x-amz-meta
meta_key = header
if meta_key:
- metadata[meta_key] = headers[header]
+ metadata[meta_key] = (
+ headers[header][0]
+ if type(headers[header]) == list
+ else headers[header]
+ )
return metadata
diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py
index 8ac227f4f..57f745437 100644
--- a/tests/test_s3/test_s3.py
+++ b/tests/test_s3/test_s3.py
@@ -4583,3 +4583,106 @@ def test_encryption():
conn.delete_bucket_encryption(Bucket="mybucket")
with assert_raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
+
+
+@mock_s3
+def test_presigned_url_restrict_parameters():
+ # Only specific params can be set
+ # Ensure error is thrown when adding custom metadata this way
+ bucket = str(uuid.uuid4())
+ key = "file.txt"
+ conn = boto3.resource("s3", region_name="us-east-1")
+ conn.create_bucket(Bucket=bucket)
+ s3 = boto3.client("s3", region_name="us-east-1")
+
+ # Create a pre-signed url with some metadata.
+ with assert_raises(botocore.exceptions.ParamValidationError) as err:
+ s3.generate_presigned_url(
+ ClientMethod="put_object",
+ Params={"Bucket": bucket, "Key": key, "Unknown": "metadata"},
+ )
+ assert str(err.exception).should.equal(
+ 'Parameter validation failed:\nUnknown parameter in input: "Unknown", must be one of: ACL, Body, Bucket, CacheControl, ContentDisposition, ContentEncoding, ContentLanguage, ContentLength, ContentMD5, ContentType, Expires, GrantFullControl, GrantRead, GrantReadACP, GrantWriteACP, Key, Metadata, ServerSideEncryption, StorageClass, WebsiteRedirectLocation, SSECustomerAlgorithm, SSECustomerKey, SSECustomerKeyMD5, SSEKMSKeyId, SSEKMSEncryptionContext, RequestPayer, Tagging, ObjectLockMode, ObjectLockRetainUntilDate, ObjectLockLegalHoldStatus'
+ )
+
+ s3.delete_bucket(Bucket=bucket)
+
+
+@mock_s3
+def test_presigned_put_url_with_approved_headers():
+ bucket = str(uuid.uuid4())
+ key = "file.txt"
+ content = b"filecontent"
+ expected_contenttype = "app/sth"
+ conn = boto3.resource("s3", region_name="us-east-1")
+ conn.create_bucket(Bucket=bucket)
+ s3 = boto3.client("s3", region_name="us-east-1")
+
+ # Create a pre-signed url with some metadata.
+ url = s3.generate_presigned_url(
+ ClientMethod="put_object",
+ Params={"Bucket": bucket, "Key": key, "ContentType": expected_contenttype},
+ )
+
+ # Verify S3 throws an error when the header is not provided
+ response = requests.put(url, data=content)
+ response.status_code.should.equal(403)
+ str(response.content).should.contain("SignatureDoesNotMatch
")
+ str(response.content).should.contain(
+ "The request signature we calculated does not match the signature you provided. Check your key and signing method."
+ )
+
+ # Verify S3 throws an error when the header has the wrong value
+ response = requests.put(
+ url, data=content, headers={"Content-Type": "application/unknown"}
+ )
+ response.status_code.should.equal(403)
+ str(response.content).should.contain("SignatureDoesNotMatch
")
+ str(response.content).should.contain(
+ "The request signature we calculated does not match the signature you provided. Check your key and signing method."
+ )
+
+ # Verify S3 uploads correctly when providing the meta data
+ response = requests.put(
+ url, data=content, headers={"Content-Type": expected_contenttype}
+ )
+ response.status_code.should.equal(200)
+
+ # Assert the object exists
+ obj = s3.get_object(Bucket=bucket, Key=key)
+ obj["ContentType"].should.equal(expected_contenttype)
+ obj["ContentLength"].should.equal(11)
+ obj["Body"].read().should.equal(content)
+ obj["Metadata"].should.equal({})
+
+ s3.delete_object(Bucket=bucket, Key=key)
+ s3.delete_bucket(Bucket=bucket)
+
+
+@mock_s3
+def test_presigned_put_url_with_custom_headers():
+ bucket = str(uuid.uuid4())
+ key = "file.txt"
+ content = b"filecontent"
+ conn = boto3.resource("s3", region_name="us-east-1")
+ conn.create_bucket(Bucket=bucket)
+ s3 = boto3.client("s3", region_name="us-east-1")
+
+ # Create a pre-signed url with some metadata.
+ url = s3.generate_presigned_url(
+ ClientMethod="put_object",
+ Params={"Bucket": bucket, "Key": key, "Metadata": {"venue": "123"}},
+ )
+
+ # Verify S3 uploads correctly when providing the meta data
+ response = requests.put(url, data=content)
+ response.status_code.should.equal(200)
+
+ # Assert the object exists
+ obj = s3.get_object(Bucket=bucket, Key=key)
+ obj["ContentLength"].should.equal(11)
+ obj["Body"].read().should.equal(content)
+ obj["Metadata"].should.equal({"venue": "123"})
+
+ s3.delete_object(Bucket=bucket, Key=key)
+ s3.delete_bucket(Bucket=bucket)