moto/tests/test_s3/test_s3.py

3430 lines
117 KiB
Python

import datetime
from gzip import GzipFile
from io import BytesIO
import json
import os
import pickle
from unittest import SkipTest, mock
from urllib.parse import urlparse, parse_qs
import uuid
from uuid import uuid4
import zlib
import boto3
from botocore.client import ClientError
import botocore.exceptions
from botocore.handlers import disable_signing
from freezegun import freeze_time
import pytest
import requests
from moto import settings, mock_s3, mock_config
from moto.moto_api import state_manager
from moto.s3.responses import DEFAULT_REGION_NAME
import moto.s3.models as s3model
class MyModel:
def __init__(self, name, value, metadata=None):
self.name = name
self.value = value
self.metadata = metadata or {}
def save(self):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.put_object(
Bucket="mybucket", Key=self.name, Body=self.value, Metadata=self.metadata
)
@mock_s3
def test_keys_are_pickleable():
"""Keys must be pickleable due to boto3 implementation details."""
key = s3model.FakeKey("name", b"data!")
assert key.value == b"data!"
pickled = pickle.dumps(key)
loaded = pickle.loads(pickled)
assert loaded.value == key.value
@mock_s3
def test_my_model_save():
# Create Bucket so that test can run
conn = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
conn.create_bucket(Bucket="mybucket")
####################################
model_instance = MyModel("steve", "is awesome")
model_instance.save()
body = conn.Object("mybucket", "steve").get()["Body"].read().decode()
assert body == "is awesome"
@mock_s3
def test_object_metadata():
"""Metadata keys can contain certain special characters like dash and dot"""
# Create Bucket so that test can run
conn = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
conn.create_bucket(Bucket="mybucket")
####################################
metadata = {"meta": "simple", "my-meta": "dash", "meta.data": "namespaced"}
model_instance = MyModel("steve", "is awesome", metadata=metadata)
model_instance.save()
meta = conn.Object("mybucket", "steve").get()["Metadata"]
assert meta == metadata
@mock_s3
def test_resource_get_object_returns_etag():
conn = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
conn.create_bucket(Bucket="mybucket")
model_instance = MyModel("steve", "is awesome")
model_instance.save()
assert conn.Bucket("mybucket").Object("steve").e_tag == (
'"d32bda93738f7e03adb22e66c90fbc04"'
)
@mock_s3
def test_key_save_to_missing_bucket():
s3_resource = boto3.resource("s3")
key = s3_resource.Object("mybucket", "the-key")
with pytest.raises(ClientError) as ex:
key.put(Body=b"foobar")
assert ex.value.response["Error"]["Code"] == "NoSuchBucket"
assert ex.value.response["Error"]["Message"] == (
"The specified bucket does not exist"
)
@mock_s3
def test_missing_key_request():
if settings.TEST_SERVER_MODE:
raise SkipTest("Only test status code in non-ServerMode")
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="foobar")
response = requests.get("http://foobar.s3.amazonaws.com/the-key")
assert response.status_code == 404
@mock_s3
def test_empty_key():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
key = s3_resource.Object("foobar", "the-key")
key.put(Body=b"")
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["ContentLength"] == 0
assert resp["Body"].read() == b""
@mock_s3
def test_key_name_encoding_in_listing():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
name = "6T7\x159\x12\r\x08.txt"
key = s3_resource.Object("foobar", name)
key.put(Body=b"")
key_received = client.list_objects(Bucket="foobar")["Contents"][0]["Key"]
assert key_received == name
key_received = client.list_objects_v2(Bucket="foobar")["Contents"][0]["Key"]
assert key_received == name
name = "example/file.text"
client.put_object(Bucket="foobar", Key=name, Body=b"")
key_received = client.list_objects(
Bucket="foobar", Prefix="example/", Delimiter="/", MaxKeys=1, EncodingType="url"
)["Contents"][0]["Key"]
assert key_received == name
@mock_s3
def test_empty_key_set_on_existing_key():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
key = s3_resource.Object("foobar", "the-key")
key.put(Body=b"some content")
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["ContentLength"] == 12
assert resp["Body"].read() == b"some content"
key.put(Body=b"")
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["ContentLength"] == 0
assert resp["Body"].read() == b""
@mock_s3
def test_large_key_save():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
key = s3_resource.Object("foobar", "the-key")
key.put(Body=b"foobar" * 100000)
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["Body"].read() == b"foobar" * 100000
@mock_s3
def test_set_metadata():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
key = s3_resource.Object("foobar", "the-key")
key.put(Body=b"some value", Metadata={"md": "Metadatastring"})
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["Metadata"] == {"md": "Metadatastring"}
@freeze_time("2012-01-01 12:00:00")
@mock_s3
def test_last_modified():
# See https://github.com/boto/boto/issues/466
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
key = s3_resource.Object("foobar", "the-key")
key.put(Body=b"some value", Metadata={"md": "Metadatastring"})
resp = client.list_objects_v2(Bucket="foobar")["Contents"]
assert isinstance(resp[0]["LastModified"], datetime.datetime)
resp = client.get_object(Bucket="foobar", Key="the-key")
assert isinstance(resp["LastModified"], datetime.datetime)
as_header = resp["ResponseMetadata"]["HTTPHeaders"]["last-modified"]
assert isinstance(as_header, str)
if not settings.TEST_SERVER_MODE:
assert as_header == "Sun, 01 Jan 2012 12:00:00 GMT"
@mock_s3
def test_missing_bucket():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
with pytest.raises(ClientError) as ex:
client.head_bucket(Bucket="mybucket")
assert ex.value.response["Error"]["Code"] == "404"
assert ex.value.response["Error"]["Message"] == "Not Found"
with pytest.raises(ClientError) as ex:
client.head_bucket(Bucket="dash-in-name")
assert ex.value.response["Error"]["Code"] == "404"
assert ex.value.response["Error"]["Message"] == "Not Found"
@mock_s3
def test_create_existing_bucket():
"""Creating a bucket that already exists should raise an Error."""
client = boto3.client("s3", region_name="us-west-2")
kwargs = {
"Bucket": "foobar",
"CreateBucketConfiguration": {"LocationConstraint": "us-west-2"},
}
client.create_bucket(**kwargs)
with pytest.raises(ClientError) as ex:
client.create_bucket(**kwargs)
assert ex.value.response["Error"]["Code"] == "BucketAlreadyOwnedByYou"
assert ex.value.response["Error"]["Message"] == (
"Your previous request to create the named bucket succeeded and you already own it."
)
@mock_s3
def test_create_existing_bucket_in_us_east_1():
"""Creating a bucket that already exists in us-east-1 returns the bucket.
http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
Your previous request to create the named bucket succeeded and you already
own it. You get this error in all AWS regions except US Standard,
us-east-1. In us-east-1 region, you will get 200 OK, but it is no-op (if
bucket exists it Amazon S3 will not do anything).
"""
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="foobar")
client.create_bucket(Bucket="foobar")
@mock_s3
def test_bucket_deletion():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="foobar")
key = s3_resource.Object("foobar", "the-key")
key.put(Body=b"some value")
# Try to delete a bucket that still has keys
with pytest.raises(ClientError) as ex:
client.delete_bucket(Bucket="foobar")
assert ex.value.response["Error"]["Code"] == "BucketNotEmpty"
assert ex.value.response["Error"]["Message"] == (
"The bucket you tried to delete is not empty"
)
client.delete_object(Bucket="foobar", Key="the-key")
client.delete_bucket(Bucket="foobar")
# Delete non-existent bucket
with pytest.raises(ClientError) as ex:
client.delete_bucket(Bucket="foobar")
assert ex.value.response["Error"]["Code"] == "NoSuchBucket"
assert ex.value.response["Error"]["Message"] == (
"The specified bucket does not exist"
)
@mock_s3
def test_get_all_buckets():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="foobar")
client.create_bucket(Bucket="foobar2")
assert len(client.list_buckets()["Buckets"]) == 2
@mock_s3
def test_post_to_bucket():
if settings.TEST_SERVER_MODE:
# ServerMode does not allow unauthorized requests
raise SkipTest()
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="foobar")
requests.post(
"https://foobar.s3.amazonaws.com/", {"key": "the-key", "file": "nothing"}
)
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["Body"].read() == b"nothing"
@mock_s3
def test_post_with_metadata_to_bucket():
if settings.TEST_SERVER_MODE:
# ServerMode does not allow unauthorized requests
raise SkipTest()
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="foobar")
requests.post(
"https://foobar.s3.amazonaws.com/",
{"key": "the-key", "file": "nothing", "x-amz-meta-test": "metadata"},
)
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["Metadata"] == {"test": "metadata"}
@mock_s3
def test_delete_versioned_objects():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = "test"
key = "test"
s3_client.create_bucket(Bucket=bucket)
s3_client.put_object(Bucket=bucket, Key=key, Body=b"")
s3_client.put_bucket_versioning(
Bucket=bucket, VersioningConfiguration={"Status": "Enabled"}
)
objects = s3_client.list_objects_v2(Bucket=bucket).get("Contents")
versions = s3_client.list_object_versions(Bucket=bucket).get("Versions")
delete_markers = s3_client.list_object_versions(Bucket=bucket).get("DeleteMarkers")
assert len(objects) == 1
assert len(versions) == 1
assert delete_markers is None
s3_client.delete_object(Bucket=bucket, Key=key)
objects = s3_client.list_objects_v2(Bucket=bucket).get("Contents")
versions = s3_client.list_object_versions(Bucket=bucket).get("Versions")
delete_markers = s3_client.list_object_versions(Bucket=bucket).get("DeleteMarkers")
assert objects is None
assert len(versions) == 1
assert len(delete_markers) == 1
s3_client.delete_object(
Bucket=bucket, Key=key, VersionId=versions[0].get("VersionId")
)
objects = s3_client.list_objects_v2(Bucket=bucket).get("Contents")
versions = s3_client.list_object_versions(Bucket=bucket).get("Versions")
delete_markers = s3_client.list_object_versions(Bucket=bucket).get("DeleteMarkers")
assert objects is None
assert versions is None
assert len(delete_markers) == 1
s3_client.delete_object(
Bucket=bucket, Key=key, VersionId=delete_markers[0].get("VersionId")
)
objects = s3_client.list_objects_v2(Bucket=bucket).get("Contents")
versions = s3_client.list_object_versions(Bucket=bucket).get("Versions")
delete_markers = s3_client.list_object_versions(Bucket=bucket).get("DeleteMarkers")
assert objects is None
assert versions is None
assert delete_markers is None
@mock_s3
def test_delete_missing_key():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
s3_resource.Object("foobar", "key1").put(Body=b"some value")
s3_resource.Object("foobar", "key2").put(Body=b"some value")
s3_resource.Object("foobar", "key3").put(Body=b"some value")
s3_resource.Object("foobar", "key4").put(Body=b"some value")
result = bucket.delete_objects(
Delete={
"Objects": [
{"Key": "unknown"},
{"Key": "key1"},
{"Key": "key3"},
{"Key": "typo"},
]
}
)
assert result["Deleted"] == (
[{"Key": "unknown"}, {"Key": "key1"}, {"Key": "key3"}, {"Key": "typo"}]
)
assert "Errors" not in result
objects = list(bucket.objects.all())
assert {o.key for o in objects} == set(["key2", "key4"])
@mock_s3
def test_delete_empty_keys_list():
with pytest.raises(ClientError) as err:
boto3.client("s3").delete_objects(Bucket="foobar", Delete={"Objects": []})
assert err.value.response["Error"]["Code"] == "MalformedXML"
@pytest.mark.parametrize("name", ["firstname.lastname", "with-dash"])
@mock_s3
def test_bucket_name_with_special_chars(name):
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket(name)
bucket.create()
s3_resource.Object(name, "the-key").put(Body=b"some value")
resp = client.get_object(Bucket=name, Key="the-key")
assert resp["Body"].read() == b"some value"
@pytest.mark.parametrize(
"key", ["normal", "test_list_keys_2/x?y", "/the-key-unîcode/test"]
)
@mock_s3
def test_key_with_special_characters(key):
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("testname")
bucket.create()
s3_resource.Object("testname", key).put(Body=b"value")
objects = list(bucket.objects.all())
assert [o.key for o in objects] == [key]
resp = client.get_object(Bucket="testname", Key=key)
assert resp["Body"].read() == b"value"
@mock_s3
def test_bucket_key_listing_order():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "test_bucket"
bucket = s3_resource.Bucket(bucket_name)
bucket.create()
prefix = "toplevel/"
names = ["x/key", "y.key1", "y.key2", "y.key3", "x/y/key", "x/y/z/key"]
for name in names:
s3_resource.Object(bucket_name, prefix + name).put(Body=b"somedata")
delimiter = ""
keys = [x.key for x in bucket.objects.filter(Prefix=prefix, Delimiter=delimiter)]
assert keys == [
"toplevel/x/key",
"toplevel/x/y/key",
"toplevel/x/y/z/key",
"toplevel/y.key1",
"toplevel/y.key2",
"toplevel/y.key3",
]
delimiter = "/"
keys = [x.key for x in bucket.objects.filter(Prefix=prefix, Delimiter=delimiter)]
assert keys == ["toplevel/y.key1", "toplevel/y.key2", "toplevel/y.key3"]
# Test delimiter with no prefix
keys = [x.key for x in bucket.objects.filter(Delimiter=delimiter)]
assert keys == []
prefix = "toplevel/x"
keys = [x.key for x in bucket.objects.filter(Prefix=prefix)]
assert keys == ["toplevel/x/key", "toplevel/x/y/key", "toplevel/x/y/z/key"]
keys = [x.key for x in bucket.objects.filter(Prefix=prefix, Delimiter=delimiter)]
assert keys == []
@mock_s3
def test_key_with_reduced_redundancy():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "test_bucket"
bucket = s3_resource.Bucket(bucket_name)
bucket.create()
bucket.put_object(
Key="test_rr_key", Body=b"somedata", StorageClass="REDUCED_REDUNDANCY"
)
# we use the bucket iterator because of:
# https:/github.com/boto/boto/issues/1173
assert [x.storage_class for x in bucket.objects.all()] == ["REDUCED_REDUNDANCY"]
@freeze_time("2012-01-01 12:00:00")
@mock_s3
def test_restore_key():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
key = bucket.put_object(Key="the-key", Body=b"somedata", StorageClass="GLACIER")
assert key.restore is None
key.restore_object(RestoreRequest={"Days": 1})
if settings.TEST_SERVER_MODE:
assert 'ongoing-request="false"' in key.restore
else:
assert key.restore == (
'ongoing-request="false", expiry-date="Mon, 02 Jan 2012 12:00:00 GMT"'
)
key.restore_object(RestoreRequest={"Days": 2})
if settings.TEST_SERVER_MODE:
assert 'ongoing-request="false"' in key.restore
else:
assert key.restore == (
'ongoing-request="false", expiry-date="Tue, 03 Jan 2012 12:00:00 GMT"'
)
@freeze_time("2012-01-01 12:00:00")
@mock_s3
def test_restore_key_transition():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't set transition directly in ServerMode")
state_manager.set_transition(
model_name="s3::keyrestore", transition={"progression": "manual", "times": 1}
)
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
key = bucket.put_object(Key="the-key", Body=b"somedata", StorageClass="GLACIER")
assert key.restore is None
key.restore_object(RestoreRequest={"Days": 1})
# first call: there should be an ongoing request
assert 'ongoing-request="true"' in key.restore
# second call: request should be done
key.load()
assert 'ongoing-request="false"' in key.restore
# third call: request should still be done
key.load()
assert 'ongoing-request="false"' in key.restore
state_manager.unset_transition(model_name="s3::keyrestore")
@mock_s3
def test_cannot_restore_standard_class_object():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
key = bucket.put_object(Key="the-key", Body=b"somedata")
with pytest.raises(Exception) as err:
key.restore_object(RestoreRequest={"Days": 1})
err = err.value.response["Error"]
assert err["Code"] == "InvalidObjectState"
assert err["StorageClass"] == "STANDARD"
assert err["Message"] == (
"The operation is not valid for the object's storage class"
)
@mock_s3
def test_get_versioning_status():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
version_info = s3_resource.BucketVersioning("foobar")
assert version_info.status is None
version_info.enable()
assert version_info.status == "Enabled"
version_info.suspend()
assert version_info.status == "Suspended"
@mock_s3
def test_key_version():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
bucket.Versioning().enable()
versions = []
key = bucket.put_object(Key="the-key", Body=b"somedata")
versions.append(key.version_id)
key.put(Body=b"some string")
versions.append(key.version_id)
assert len(set(versions)) == 2
key = client.get_object(Bucket="foobar", Key="the-key")
assert key["VersionId"] == versions[-1]
@mock_s3
def test_list_versions():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
bucket.Versioning().enable()
key_versions = []
key = bucket.put_object(Key="the-key", Body=b"Version 1")
key_versions.append(key.version_id)
key = bucket.put_object(Key="the-key", Body=b"Version 2")
key_versions.append(key.version_id)
assert len(key_versions) == 2
versions = client.list_object_versions(Bucket="foobar")["Versions"]
assert len(versions) == 2
assert versions[0]["Key"] == "the-key"
assert versions[0]["VersionId"] == key_versions[1]
resp = client.get_object(Bucket="foobar", Key="the-key")
assert resp["Body"].read() == b"Version 2"
resp = client.get_object(
Bucket="foobar", Key="the-key", VersionId=versions[0]["VersionId"]
)
assert resp["Body"].read() == b"Version 2"
assert versions[1]["Key"] == "the-key"
assert versions[1]["VersionId"] == key_versions[0]
resp = client.get_object(
Bucket="foobar", Key="the-key", VersionId=versions[1]["VersionId"]
)
assert resp["Body"].read() == b"Version 1"
bucket.put_object(Key="the2-key", Body=b"Version 1")
assert len(list(bucket.objects.all())) == 2
versions = client.list_object_versions(Bucket="foobar", Prefix="the2")["Versions"]
assert len(versions) == 1
@mock_s3
def test_acl_setting():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
content = b"imafile"
keyname = "test.txt"
bucket.put_object(
Key=keyname, Body=content, ContentType="text/plain", ACL="public-read"
)
grants = client.get_object_acl(Bucket="foobar", Key=keyname)["Grants"]
assert {
"Grantee": {
"Type": "Group",
"URI": "http://acs.amazonaws.com/groups/global/AllUsers",
},
"Permission": "READ",
} in grants
@mock_s3
def test_acl_setting_via_headers():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
keyname = "test.txt"
bucket.put_object(Key=keyname, Body=b"imafile")
client.put_object_acl(ACL="public-read", Bucket="foobar", Key=keyname)
grants = client.get_object_acl(Bucket="foobar", Key=keyname)["Grants"]
assert {
"Grantee": {
"Type": "Group",
"URI": "http://acs.amazonaws.com/groups/global/AllUsers",
},
"Permission": "READ",
} in grants
@mock_s3
def test_acl_switching():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("foobar")
bucket.create()
keyname = "test.txt"
bucket.put_object(Key=keyname, Body=b"asdf", ACL="public-read")
client.put_object_acl(ACL="private", Bucket="foobar", Key=keyname)
grants = client.get_object_acl(Bucket="foobar", Key=keyname)["Grants"]
assert {
"Grantee": {
"Type": "Group",
"URI": "http://acs.amazonaws.com/groups/global/AllUsers",
},
"Permission": "READ",
} not in grants
@mock_s3
def test_acl_switching_nonexistent_key():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
with pytest.raises(ClientError) as exc:
s3_client.put_object_acl(Bucket="mybucket", Key="nonexistent", ACL="private")
assert exc.value.response["Error"]["Code"] == "NoSuchKey"
@mock_s3
def test_streaming_upload_from_file_to_presigned_url():
s3_resource = boto3.resource("s3", region_name="us-east-1")
bucket = s3_resource.Bucket("test-bucket")
bucket.create()
bucket.put_object(Body=b"ABCD", Key="file.txt")
params = {"Bucket": "test-bucket", "Key": "file.txt"}
presigned_url = boto3.client("s3").generate_presigned_url(
"put_object", params, ExpiresIn=900
)
with open(__file__, "rb") as fhandle:
response = requests.get(presigned_url, data=fhandle)
assert response.status_code == 200
@mock_s3
def test_upload_from_file_to_presigned_url():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
params = {"Bucket": "mybucket", "Key": "file_upload"}
presigned_url = boto3.client("s3").generate_presigned_url(
"put_object", params, ExpiresIn=900
)
file = open("text.txt", "w", encoding="utf-8")
file.write("test")
file.close()
files = {"upload_file": open("text.txt", "rb")}
requests.put(presigned_url, files=files)
resp = s3_client.get_object(Bucket="mybucket", Key="file_upload")
data = resp["Body"].read()
assert data == b"test"
# cleanup
os.remove("text.txt")
@mock_s3
def test_upload_file_with_checksum_algorithm():
random_bytes = (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00"
b"\x00\xff\n\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\n"
)
with open("rb.tmp", mode="wb") as fhandle:
fhandle.write(random_bytes)
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket = "mybucket"
s3_client.create_bucket(Bucket=bucket)
s3_client.upload_file(
"rb.tmp", bucket, "my_key.csv", ExtraArgs={"ChecksumAlgorithm": "SHA256"}
)
os.remove("rb.tmp")
actual_content = s3_resource.Object(bucket, "my_key.csv").get()["Body"].read()
assert random_bytes == actual_content
@mock_s3
def test_put_chunked_with_v4_signature_in_body():
bucket_name = "mybucket"
file_name = "file"
content = "CONTENT"
content_bytes = bytes(content, encoding="utf8")
# 'CONTENT' as received in moto, when PutObject is called in java AWS SDK v2
chunked_body = (
b"7;chunk-signature=bd479c607ec05dd9d570893f74eed76a4b333dfa37ad"
b"6446f631ec47dc52e756\r\nCONTENT\r\n"
b"0;chunk-signature=d192ec4075ddfc18d2ef4da4f55a87dc762ba4417b3b"
b"d41e70c282f8bec2ece0\r\n\r\n"
)
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
model = MyModel(file_name, content)
model.save()
boto_etag = s3_client.get_object(Bucket=bucket_name, Key=file_name)["ETag"]
params = {"Bucket": bucket_name, "Key": file_name}
# We'll use manipulated presigned PUT, to mimick PUT from SDK
presigned_url = boto3.client("s3").generate_presigned_url(
"put_object", params, ExpiresIn=900
)
requests.put(
presigned_url,
data=chunked_body,
headers={
"Content-Type": "application/octet-stream",
"x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD",
"x-amz-decoded-content-length": str(len(content_bytes)),
},
)
resp = s3_client.get_object(Bucket=bucket_name, Key=file_name)
body = resp["Body"].read()
assert body == content_bytes
etag = resp["ETag"]
assert etag == boto_etag
@mock_s3
def test_s3_object_in_private_bucket():
s3_resource = boto3.resource("s3")
bucket = s3_resource.Bucket("test-bucket")
bucket.create(
ACL="private", CreateBucketConfiguration={"LocationConstraint": "us-west-1"}
)
bucket.put_object(ACL="private", Body=b"ABCD", Key="file.txt")
s3_anonymous = boto3.resource("s3")
s3_anonymous.meta.client.meta.events.register("choose-signer.s3.*", disable_signing)
with pytest.raises(ClientError) as exc:
s3_anonymous.Object(key="file.txt", bucket_name="test-bucket").get()
assert exc.value.response["Error"]["Code"] == "403"
bucket.put_object(ACL="public-read", Body=b"ABCD", Key="file.txt")
contents = (
s3_anonymous.Object(key="file.txt", bucket_name="test-bucket")
.get()["Body"]
.read()
)
assert contents == b"ABCD"
@mock_s3
def test_unicode_key():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("mybucket")
bucket.create()
key = bucket.put_object(Key="こんにちは.jpg", Body=b"Hello world!")
assert [listed_key.key for listed_key in bucket.objects.all()] == [key.key]
fetched_key = s3_resource.Object("mybucket", key.key)
assert fetched_key.key == key.key
assert fetched_key.get()["Body"].read().decode("utf-8") == "Hello world!"
@mock_s3
def test_unicode_value():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("mybucket")
bucket.create()
bucket.put_object(Key="some_key", Body="こんにちは.jpg")
key = s3_resource.Object("mybucket", "some_key")
assert key.get()["Body"].read().decode("utf-8") == "こんにちは.jpg"
@mock_s3
def test_setting_content_encoding():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("mybucket")
bucket.create()
bucket.put_object(Body=b"abcdef", ContentEncoding="gzip", Key="keyname")
key = s3_resource.Object("mybucket", "keyname")
assert key.content_encoding == "gzip"
@mock_s3
def test_bucket_location_default():
cli = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
# No LocationConstraint ==> us-east-1
cli.create_bucket(Bucket=bucket_name)
assert cli.get_bucket_location(Bucket=bucket_name)["LocationConstraint"] is None
@mock_s3
def test_bucket_location_nondefault():
cli = boto3.client("s3", region_name="eu-central-1")
bucket_name = "mybucket"
# LocationConstraint set for non default regions
cli.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": "eu-central-1"},
)
assert (
cli.get_bucket_location(Bucket=bucket_name)["LocationConstraint"]
== "eu-central-1"
)
@mock_s3
def test_s3_location_should_error_outside_useast1():
s3_client = boto3.client("s3", region_name="eu-west-1")
bucket_name = "asdfasdfsdfdsfasda"
with pytest.raises(ClientError) as exc:
s3_client.create_bucket(Bucket=bucket_name)
assert exc.value.response["Error"]["Message"] == (
"The unspecified location constraint is incompatible for the "
"region specific endpoint this request was sent to."
)
@mock_s3
def test_ranged_get():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.Bucket("mybucket")
bucket.create()
rep = b"0123456789"
key = bucket.put_object(Key="bigkey", Body=rep * 10)
# Implicitly bounded range requests.
assert key.get(Range="bytes=0-")["Body"].read() == rep * 10
assert key.get(Range="bytes=50-")["Body"].read() == rep * 5
assert key.get(Range="bytes=99-")["Body"].read() == b"9"
# Explicitly bounded range requests starting from the first byte.
assert key.get(Range="bytes=0-0")["Body"].read() == b"0"
assert key.get(Range="bytes=0-49")["Body"].read() == rep * 5
assert key.get(Range="bytes=0-99")["Body"].read() == rep * 10
assert key.get(Range="bytes=0-100")["Body"].read() == rep * 10
assert key.get(Range="bytes=0-700")["Body"].read() == rep * 10
# Explicitly bounded range requests starting from the / a middle byte.
assert key.get(Range="bytes=50-54")["Body"].read() == rep[:5]
assert key.get(Range="bytes=50-99")["Body"].read() == rep * 5
assert key.get(Range="bytes=50-100")["Body"].read() == rep * 5
assert key.get(Range="bytes=50-700")["Body"].read() == rep * 5
# Explicitly bounded range requests starting from the last byte.
assert key.get(Range="bytes=99-99")["Body"].read() == b"9"
assert key.get(Range="bytes=99-100")["Body"].read() == b"9"
assert key.get(Range="bytes=99-700")["Body"].read() == b"9"
# Suffix range requests.
assert key.get(Range="bytes=-1")["Body"].read() == b"9"
assert key.get(Range="bytes=-60")["Body"].read() == rep * 6
assert key.get(Range="bytes=-100")["Body"].read() == rep * 10
assert key.get(Range="bytes=-101")["Body"].read() == rep * 10
assert key.get(Range="bytes=-700")["Body"].read() == rep * 10
assert key.content_length == 100
@mock_s3
def test_policy():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
bucket = s3_resource.Bucket(bucket_name)
bucket.create()
policy = json.dumps(
{
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "DenyUnEncryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": f"arn:aws:s3:::{bucket_name}/*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
},
}
],
}
)
with pytest.raises(ClientError) as ex:
client.get_bucket_policy(Bucket=bucket_name)
assert ex.value.response["Error"]["Code"] == "NoSuchBucketPolicy"
assert ex.value.response["Error"]["Message"] == "The bucket policy does not exist"
client.put_bucket_policy(Bucket=bucket_name, Policy=policy)
assert client.get_bucket_policy(Bucket=bucket_name)["Policy"] == policy
client.delete_bucket_policy(Bucket=bucket_name)
with pytest.raises(ClientError) as ex:
client.get_bucket_policy(Bucket=bucket_name)
assert ex.value.response["Error"]["Code"] == "NoSuchBucketPolicy"
@mock_s3
def test_website_configuration_xml():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
bucket = s3_resource.Bucket(bucket_name)
bucket.create()
client.put_bucket_website(
Bucket=bucket_name,
WebsiteConfiguration={
"IndexDocument": {"Suffix": "index.html"},
"RoutingRules": [
{
"Condition": {"KeyPrefixEquals": "test/testing"},
"Redirect": {"ReplaceKeyWith": "test.txt"},
}
],
},
)
site_info = client.get_bucket_website(Bucket=bucket_name)
assert site_info["IndexDocument"] == {"Suffix": "index.html"}
assert "RoutingRules" in site_info
assert len(site_info["RoutingRules"]) == 1
rule = site_info["RoutingRules"][0]
assert rule["Condition"] == {"KeyPrefixEquals": "test/testing"}
assert rule["Redirect"] == {"ReplaceKeyWith": "test.txt"}
assert "RedirectAllRequestsTo" not in site_info
assert "ErrorDocument" not in site_info
@mock_s3
def test_client_get_object_returns_etag():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="steve", Body=b"is awesome")
resp = s3_client.get_object(Bucket="mybucket", Key="steve")
assert resp["ETag"] == '"d32bda93738f7e03adb22e66c90fbc04"'
@mock_s3
def test_website_redirect_location():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="steve", Body=b"is awesome")
resp = s3_client.get_object(Bucket="mybucket", Key="steve")
assert resp.get("WebsiteRedirectLocation") is None
url = "https://github.com/getmoto/moto"
s3_client.put_object(
Bucket="mybucket", Key="steve", Body=b"is awesome", WebsiteRedirectLocation=url
)
resp = s3_client.get_object(Bucket="mybucket", Key="steve")
assert resp["WebsiteRedirectLocation"] == url
@mock_s3
def test_delimiter_optional_in_response():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="one", Body=b"1")
resp = s3_client.list_objects(Bucket="mybucket", MaxKeys=1)
assert resp.get("Delimiter") is None
resp = s3_client.list_objects(Bucket="mybucket", MaxKeys=1, Delimiter="/")
assert resp.get("Delimiter") == "/"
@mock_s3
def test_list_objects_with_pagesize_0():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
resp = s3_client.list_objects(Bucket="mybucket", MaxKeys=0)
assert resp["Name"] == "mybucket"
assert resp["MaxKeys"] == 0
assert resp["IsTruncated"] is False
assert "Contents" not in resp
@mock_s3
def test_list_objects_truncated_response():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="one", Body=b"1")
s3_client.put_object(Bucket="mybucket", Key="two", Body=b"22")
s3_client.put_object(Bucket="mybucket", Key="three", Body=b"333")
# First list
resp = s3_client.list_objects(Bucket="mybucket", MaxKeys=1)
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "one"
assert resp["MaxKeys"] == 1
assert resp["IsTruncated"] is True
assert resp.get("Prefix") is None
assert resp.get("Delimiter") is None
assert "NextMarker" in resp
next_marker = resp["NextMarker"]
# Second list
resp = s3_client.list_objects(Bucket="mybucket", MaxKeys=1, Marker=next_marker)
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "three"
assert resp["MaxKeys"] == 1
assert resp["IsTruncated"] is True
assert resp.get("Prefix") is None
assert resp.get("Delimiter") is None
assert "NextMarker" in resp
next_marker = resp["NextMarker"]
# Third list
resp = s3_client.list_objects(Bucket="mybucket", MaxKeys=1, Marker=next_marker)
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "two"
assert resp["MaxKeys"] == 1
assert resp["IsTruncated"] is False
assert resp.get("Prefix") is None
assert resp.get("Delimiter") is None
assert "NextMarker" not in resp
@mock_s3
def test_list_keys_xml_escaped():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
key_name = "Q&A.txt"
s3_client.put_object(Bucket="mybucket", Key=key_name, Body=b"is awesome")
resp = s3_client.list_objects_v2(Bucket="mybucket", Prefix=key_name)
assert resp["Contents"][0]["Key"] == key_name
assert resp["KeyCount"] == 1
assert resp["MaxKeys"] == 1000
assert resp["Prefix"] == key_name
assert resp["IsTruncated"] is False
assert "Delimiter" not in resp
assert "StartAfter" not in resp
assert "NextContinuationToken" not in resp
assert "Owner" not in resp["Contents"][0]
@mock_s3
def test_list_objects_v2_common_prefix_pagination():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
max_keys = 1
keys = [f"test/{i}/{i}" for i in range(3)]
for key in keys:
s3_client.put_object(Bucket="mybucket", Key=key, Body=b"v")
prefixes = []
args = {
"Bucket": "mybucket",
"Delimiter": "/",
"Prefix": "test/",
"MaxKeys": max_keys,
}
resp = {"IsTruncated": True}
while resp.get("IsTruncated", False):
if "NextContinuationToken" in resp:
args["ContinuationToken"] = resp["NextContinuationToken"]
resp = s3_client.list_objects_v2(**args)
if "CommonPrefixes" in resp:
assert len(resp["CommonPrefixes"]) == max_keys
prefixes.extend(i["Prefix"] for i in resp["CommonPrefixes"])
assert prefixes == [k[: k.rindex("/") + 1] for k in keys]
@mock_s3
def test_list_objects_v2_common_invalid_continuation_token():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
max_keys = 1
keys = [f"test/{i}/{i}" for i in range(3)]
for key in keys:
s3_client.put_object(Bucket="mybucket", Key=key, Body=b"v")
args = {
"Bucket": "mybucket",
"Delimiter": "/",
"Prefix": "test/",
"MaxKeys": max_keys,
"ContinuationToken": "",
}
with pytest.raises(botocore.exceptions.ClientError) as exc:
s3_client.list_objects_v2(**args)
assert exc.value.response["Error"]["Code"] == "InvalidArgument"
assert exc.value.response["Error"]["Message"] == (
"The continuation token provided is incorrect"
)
@mock_s3
def test_list_objects_v2_truncated_response():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="one", Body=b"1")
s3_client.put_object(Bucket="mybucket", Key="two", Body=b"22")
s3_client.put_object(Bucket="mybucket", Key="three", Body=b"333")
# First list
resp = s3_client.list_objects_v2(Bucket="mybucket", MaxKeys=1)
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "one"
assert resp["MaxKeys"] == 1
assert resp["Prefix"] == ""
assert resp["KeyCount"] == 1
assert resp["IsTruncated"] is True
assert "Delimiter" not in resp
assert "StartAfter" not in resp
assert "Owner" not in listed_object # owner info was not requested
next_token = resp["NextContinuationToken"]
# Second list
resp = s3_client.list_objects_v2(
Bucket="mybucket", MaxKeys=1, ContinuationToken=next_token
)
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "three"
assert resp["MaxKeys"] == 1
assert resp["Prefix"] == ""
assert resp["KeyCount"] == 1
assert resp["IsTruncated"] is True
assert "Delimiter" not in resp
assert "StartAfter" not in resp
assert "Owner" not in listed_object
next_token = resp["NextContinuationToken"]
# Third list
resp = s3_client.list_objects_v2(
Bucket="mybucket", MaxKeys=1, ContinuationToken=next_token
)
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "two"
assert resp["MaxKeys"] == 1
assert resp["Prefix"] == ""
assert resp["KeyCount"] == 1
assert resp["IsTruncated"] is False
assert "Delimiter" not in resp
assert "Owner" not in listed_object
assert "StartAfter" not in resp
assert "NextContinuationToken" not in resp
@mock_s3
def test_list_objects_v2_truncated_response_start_after():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="one", Body=b"1")
s3_client.put_object(Bucket="mybucket", Key="two", Body=b"22")
s3_client.put_object(Bucket="mybucket", Key="three", Body=b"333")
# First list
resp = s3_client.list_objects_v2(Bucket="mybucket", MaxKeys=1, StartAfter="one")
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "three"
assert resp["MaxKeys"] == 1
assert resp["Prefix"] == ""
assert resp["KeyCount"] == 1
assert resp["IsTruncated"] is True
assert resp["StartAfter"] == "one"
assert "Delimiter" not in resp
assert "Owner" not in listed_object
next_token = resp["NextContinuationToken"]
# Second list
# The ContinuationToken must take precedence over StartAfter.
resp = s3_client.list_objects_v2(
Bucket="mybucket", MaxKeys=1, StartAfter="one", ContinuationToken=next_token
)
listed_object = resp["Contents"][0]
assert listed_object["Key"] == "two"
assert resp["MaxKeys"] == 1
assert resp["Prefix"] == ""
assert resp["KeyCount"] == 1
assert resp["IsTruncated"] is False
# When ContinuationToken is given, StartAfter is ignored. This also means
# AWS does not return it in the response.
assert "StartAfter" not in resp
assert "Delimiter" not in resp
assert "Owner" not in listed_object
@mock_s3
def test_list_objects_v2_fetch_owner():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="one", Body=b"11")
resp = s3_client.list_objects_v2(Bucket="mybucket", FetchOwner=True)
owner = resp["Contents"][0]["Owner"]
assert "ID" in owner
assert "DisplayName" in owner
assert len(owner.keys()) == 2
@mock_s3
def test_list_objects_v2_truncate_combined_keys_and_folders():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
s3_client.put_object(Bucket="mybucket", Key="1/2", Body="")
s3_client.put_object(Bucket="mybucket", Key="2", Body="")
s3_client.put_object(Bucket="mybucket", Key="3/4", Body="")
s3_client.put_object(Bucket="mybucket", Key="4", Body="")
resp = s3_client.list_objects_v2(
Bucket="mybucket", Prefix="", MaxKeys=2, Delimiter="/"
)
assert "Delimiter" in resp
assert resp["IsTruncated"] is True
assert resp["KeyCount"] == 2
assert len(resp["Contents"]) == 1
assert resp["Contents"][0]["Key"] == "2"
assert len(resp["CommonPrefixes"]) == 1
assert resp["CommonPrefixes"][0]["Prefix"] == "1/"
last_tail = resp["NextContinuationToken"]
resp = s3_client.list_objects_v2(
Bucket="mybucket", MaxKeys=2, Prefix="", Delimiter="/", StartAfter=last_tail
)
assert resp["KeyCount"] == 2
assert resp["IsTruncated"] is False
assert len(resp["Contents"]) == 1
assert resp["Contents"][0]["Key"] == "4"
assert len(resp["CommonPrefixes"]) == 1
assert resp["CommonPrefixes"][0]["Prefix"] == "3/"
@mock_s3
def test_list_objects_v2_checksum_algo():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
resp = s3_client.put_object(Bucket="mybucket", Key="0", Body="a")
assert "ChecksumCRC32" not in resp
assert "x-amz-sdk-checksum-algorithm" not in resp["ResponseMetadata"]["HTTPHeaders"]
resp = s3_client.put_object(
Bucket="mybucket", Key="1", Body="a", ChecksumAlgorithm="CRC32"
)
assert "ChecksumCRC32" in resp
assert (
resp["ResponseMetadata"]["HTTPHeaders"]["x-amz-sdk-checksum-algorithm"]
== "CRC32"
)
resp = s3_client.put_object(
Bucket="mybucket", Key="2", Body="b", ChecksumAlgorithm="SHA256"
)
assert "ChecksumSHA256" in resp
assert (
resp["ResponseMetadata"]["HTTPHeaders"]["x-amz-sdk-checksum-algorithm"]
== "SHA256"
)
resp = s3_client.list_objects_v2(Bucket="mybucket")["Contents"]
assert "ChecksumAlgorithm" not in resp[0]
assert resp[1]["ChecksumAlgorithm"] == ["CRC32"]
assert resp[2]["ChecksumAlgorithm"] == ["SHA256"]
@mock_s3
def test_bucket_create():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="blah")
s3_resource.Object("blah", "hello.txt").put(Body="some text")
assert (
s3_resource.Object("blah", "hello.txt").get()["Body"].read().decode("utf-8")
== "some text"
)
@mock_s3
def test_bucket_create_force_us_east_1():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
with pytest.raises(ClientError) as exc:
s3_resource.create_bucket(
Bucket="blah",
CreateBucketConfiguration={"LocationConstraint": DEFAULT_REGION_NAME},
)
assert exc.value.response["Error"]["Code"] == "InvalidLocationConstraint"
@mock_s3
def test_bucket_create_eu_central():
s3_resource = boto3.resource("s3", region_name="eu-central-1")
s3_resource.create_bucket(
Bucket="blah", CreateBucketConfiguration={"LocationConstraint": "eu-central-1"}
)
s3_resource.Object("blah", "hello.txt").put(Body="some text")
assert (
s3_resource.Object("blah", "hello.txt").get()["Body"].read().decode("utf-8")
== "some text"
)
@mock_s3
def test_bucket_create_empty_bucket_configuration_should_return_malformed_xml_error():
s3_resource = boto3.resource("s3", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
s3_resource.create_bucket(Bucket="whatever", CreateBucketConfiguration={})
assert exc.value.response["Error"]["Code"] == "MalformedXML"
assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
@mock_s3
def test_head_object():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="blah")
s3_resource.Object("blah", "hello.txt").put(Body="some text")
s3_resource.Object("blah", "hello.txt").meta.client.head_object(
Bucket="blah", Key="hello.txt"
)
with pytest.raises(ClientError) as exc:
s3_resource.Object("blah", "hello2.txt").meta.client.head_object(
Bucket="blah", Key="hello_bad.txt"
)
assert exc.value.response["Error"]["Code"] == "404"
@mock_s3
def test_get_object():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="blah")
s3_resource.Object("blah", "hello.txt").put(Body="some text")
s3_resource.Object("blah", "hello.txt").meta.client.head_object(
Bucket="blah", Key="hello.txt"
)
with pytest.raises(ClientError) as exc:
s3_resource.Object("blah", "hello2.txt").get()
assert exc.value.response["Error"]["Code"] == "NoSuchKey"
@mock_s3
def test_s3_content_type():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
my_bucket = s3_resource.Bucket("my-cool-bucket")
my_bucket.create()
s3_path = "test_s3.py"
s3_resource = boto3.resource("s3", verify=False)
content_type = "text/python-x"
s3_resource.Object(my_bucket.name, s3_path).put(
ContentType=content_type, Body=b"some python code"
)
assert s3_resource.Object(my_bucket.name, s3_path).content_type == content_type
@mock_s3
def test_get_missing_object_with_part_number():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="blah")
with pytest.raises(ClientError) as exc:
s3_resource.Object("blah", "hello.txt").meta.client.head_object(
Bucket="blah", Key="hello.txt", PartNumber=123
)
assert exc.value.response["Error"]["Code"] == "404"
@mock_s3
def test_head_object_with_versioning():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket = s3_resource.create_bucket(Bucket="blah")
bucket.Versioning().enable()
old_content = "some text"
new_content = "some new text"
s3_resource.Object("blah", "hello.txt").put(Body=old_content)
s3_resource.Object("blah", "hello.txt").put(Body=new_content)
versions = list(s3_resource.Bucket("blah").object_versions.all())
latest = list(filter(lambda item: item.is_latest, versions))[0]
oldest = list(filter(lambda item: not item.is_latest, versions))[0]
head_object = s3_resource.Object("blah", "hello.txt").meta.client.head_object(
Bucket="blah", Key="hello.txt"
)
assert head_object["VersionId"] == latest.id
assert head_object["ContentLength"] == len(new_content)
old_head_object = s3_resource.Object("blah", "hello.txt").meta.client.head_object(
Bucket="blah", Key="hello.txt", VersionId=oldest.id
)
assert old_head_object["VersionId"] == oldest.id
assert old_head_object["ContentLength"] == len(old_content)
assert old_head_object["VersionId"] != head_object["VersionId"]
@mock_s3
def test_deleted_versionings_list():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="blah")
client.put_bucket_versioning(
Bucket="blah", VersioningConfiguration={"Status": "Enabled"}
)
client.put_object(Bucket="blah", Key="test1", Body=b"test1")
client.put_object(Bucket="blah", Key="test2", Body=b"test2")
client.delete_objects(Bucket="blah", Delete={"Objects": [{"Key": "test1"}]})
listed = client.list_objects_v2(Bucket="blah")
assert len(listed["Contents"]) == 1
@mock_s3
def test_delete_objects_for_specific_version_id():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="blah")
client.put_bucket_versioning(
Bucket="blah", VersioningConfiguration={"Status": "Enabled"}
)
client.put_object(Bucket="blah", Key="test1", Body=b"test1a")
client.put_object(Bucket="blah", Key="test1", Body=b"test1b")
response = client.list_object_versions(Bucket="blah", Prefix="test1")
id_to_delete = [v["VersionId"] for v in response["Versions"] if v["IsLatest"]][0]
response = client.delete_objects(
Bucket="blah", Delete={"Objects": [{"Key": "test1", "VersionId": id_to_delete}]}
)
assert response["Deleted"] == [{"Key": "test1", "VersionId": id_to_delete}]
listed = client.list_objects_v2(Bucket="blah")
assert len(listed["Contents"]) == 1
@mock_s3
def test_delete_versioned_bucket():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="blah")
client.put_bucket_versioning(
Bucket="blah", VersioningConfiguration={"Status": "Enabled"}
)
resp = client.put_object(Bucket="blah", Key="test1", Body=b"test1")
client.delete_object(Bucket="blah", Key="test1", VersionId=resp["VersionId"])
client.delete_bucket(Bucket="blah")
@mock_s3
def test_delete_versioned_bucket_returns_metadata():
# Verified against AWS
name = str(uuid4())
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket=name)
client.put_bucket_versioning(
Bucket=name, VersioningConfiguration={"Status": "Enabled"}
)
client.put_object(Bucket=name, Key="test1", Body=b"test1")
# We now have zero delete markers
assert "DeleteMarkers" not in client.list_object_versions(Bucket=name)
# Delete the object
del_file = client.delete_object(Bucket=name, Key="test1")
assert del_file["DeleteMarker"] is True
assert del_file["VersionId"] is not None
# We now have one DeleteMarker
assert len(client.list_object_versions(Bucket=name)["DeleteMarkers"]) == 1
# Delete the same object gives a new version id
del_mrk1 = client.delete_object(Bucket=name, Key="test1")
assert del_mrk1["DeleteMarker"] is True
assert del_mrk1["VersionId"] != del_file["VersionId"]
# We now have two DeleteMarkers
assert len(client.list_object_versions(Bucket=name)["DeleteMarkers"]) == 2
# Delete the delete marker
del_mrk2 = client.delete_object(
Bucket=name, Key="test1", VersionId=del_mrk1["VersionId"]
)
assert del_mrk2["DeleteMarker"] is True
assert del_mrk2["VersionId"] == del_mrk1["VersionId"]
# We now have only one DeleteMarker
assert len(client.list_object_versions(Bucket=name)["DeleteMarkers"]) == 1
# Delete the actual file
actual_version = client.list_object_versions(Bucket=name)["Versions"][0]
del_mrk3 = client.delete_object(
Bucket=name, Key="test1", VersionId=actual_version["VersionId"]
)
assert "DeleteMarker" not in del_mrk3
assert del_mrk3["VersionId"] == actual_version["VersionId"]
# We still have one DeleteMarker, but zero objects
assert len(client.list_object_versions(Bucket=name)["DeleteMarkers"]) == 1
assert "Versions" not in client.list_object_versions(Bucket=name)
# Delete the last marker
del_mrk4 = client.delete_object(
Bucket=name, Key="test1", VersionId=del_mrk2["VersionId"]
)
assert "DeleteMarker" not in del_mrk4
assert del_mrk4["VersionId"] == del_mrk2["VersionId"]
@mock_s3
def test_get_object_if_modified_since_refresh():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
response = s3_client.get_object(Bucket=bucket_name, Key=key)
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.get_object(
Bucket=bucket_name,
Key=key,
IfModifiedSince=response["LastModified"],
)
err_value = err.value
assert err_value.response["Error"] == {"Code": "304", "Message": "Not Modified"}
@mock_s3
def test_get_object_if_modified_since():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.get_object(
Bucket=bucket_name,
Key=key,
IfModifiedSince=datetime.datetime.utcnow() + datetime.timedelta(hours=1),
)
err_value = err.value
assert err_value.response["Error"] == {"Code": "304", "Message": "Not Modified"}
@mock_s3
def test_get_object_if_unmodified_since():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.get_object(
Bucket=bucket_name,
Key=key,
IfUnmodifiedSince=datetime.datetime.utcnow() - datetime.timedelta(hours=1),
)
err_value = err.value
assert err_value.response["Error"]["Code"] == "PreconditionFailed"
assert err_value.response["Error"]["Condition"] == "If-Unmodified-Since"
@mock_s3
def test_get_object_if_match():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.get_object(Bucket=bucket_name, Key=key, IfMatch='"hello"')
err_value = err.value
assert err_value.response["Error"]["Code"] == "PreconditionFailed"
assert err_value.response["Error"]["Condition"] == "If-Match"
@mock_s3
def test_get_object_if_none_match():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
etag = s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")["ETag"]
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.get_object(Bucket=bucket_name, Key=key, IfNoneMatch=etag)
err_value = err.value
assert err_value.response["Error"] == {"Code": "304", "Message": "Not Modified"}
@mock_s3
def test_head_object_if_modified_since():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.head_object(
Bucket=bucket_name,
Key=key,
IfModifiedSince=datetime.datetime.utcnow() + datetime.timedelta(hours=1),
)
err_value = err.value
assert err_value.response["Error"] == {"Code": "304", "Message": "Not Modified"}
@mock_s3
def test_head_object_if_modified_since_refresh():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
response = s3_client.head_object(Bucket=bucket_name, Key=key)
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.head_object(
Bucket=bucket_name,
Key=key,
IfModifiedSince=response["LastModified"],
)
err_value = err.value
assert err_value.response["Error"] == {"Code": "304", "Message": "Not Modified"}
@mock_s3
def test_head_object_if_unmodified_since():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.head_object(
Bucket=bucket_name,
Key=key,
IfUnmodifiedSince=datetime.datetime.utcnow() - datetime.timedelta(hours=1),
)
err_value = err.value
assert err_value.response["Error"] == {
"Code": "412",
"Message": "Precondition Failed",
}
@mock_s3
def test_head_object_if_match():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.head_object(Bucket=bucket_name, Key=key, IfMatch='"hello"')
err_value = err.value
assert err_value.response["Error"] == {
"Code": "412",
"Message": "Precondition Failed",
}
@mock_s3
def test_head_object_if_none_match():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "blah"
s3_client.create_bucket(Bucket=bucket_name)
key = "hello.txt"
etag = s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")["ETag"]
with pytest.raises(botocore.exceptions.ClientError) as err:
s3_client.head_object(Bucket=bucket_name, Key=key, IfNoneMatch=etag)
err_value = err.value
assert err_value.response["Error"] == {"Code": "304", "Message": "Not Modified"}
@mock_s3
def test_put_bucket_cors():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
s3_client.create_bucket(Bucket=bucket_name)
resp = s3_client.put_bucket_cors(
Bucket=bucket_name,
CORSConfiguration={
"CORSRules": [
{
"AllowedOrigins": ["*"],
"AllowedMethods": ["GET", "POST"],
"AllowedHeaders": ["Authorization"],
"ExposeHeaders": ["x-amz-request-id"],
"MaxAgeSeconds": 123,
},
{
"AllowedOrigins": ["*"],
"AllowedMethods": ["PUT"],
"AllowedHeaders": ["Authorization"],
"ExposeHeaders": ["x-amz-request-id"],
"MaxAgeSeconds": 123,
},
]
},
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
with pytest.raises(ClientError) as err:
s3_client.put_bucket_cors(
Bucket=bucket_name,
CORSConfiguration={
"CORSRules": [
{"AllowedOrigins": ["*"], "AllowedMethods": ["NOTREAL", "POST"]}
]
},
)
err_value = err.value
assert err_value.response["Error"]["Code"] == "InvalidRequest"
assert err_value.response["Error"]["Message"] == (
"Found unsupported HTTP method in CORS config. " "Unsupported method is NOTREAL"
)
with pytest.raises(ClientError) as err:
s3_client.put_bucket_cors(
Bucket=bucket_name, CORSConfiguration={"CORSRules": []}
)
err_value = err.value
assert err_value.response["Error"]["Code"] == "MalformedXML"
# And 101:
many_rules = [{"AllowedOrigins": ["*"], "AllowedMethods": ["GET"]}] * 101
with pytest.raises(ClientError) as err:
s3_client.put_bucket_cors(
Bucket=bucket_name, CORSConfiguration={"CORSRules": many_rules}
)
err_value = err.value
assert err_value.response["Error"]["Code"] == "MalformedXML"
@mock_s3
def test_get_bucket_cors():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
s3_client.create_bucket(Bucket=bucket_name)
# Without CORS:
with pytest.raises(ClientError) as err:
s3_client.get_bucket_cors(Bucket=bucket_name)
err_value = err.value
assert err_value.response["Error"]["Code"] == "NoSuchCORSConfiguration"
assert (
err_value.response["Error"]["Message"]
== "The CORS configuration does not exist"
)
s3_client.put_bucket_cors(
Bucket=bucket_name,
CORSConfiguration={
"CORSRules": [
{
"AllowedOrigins": ["*"],
"AllowedMethods": ["GET", "POST"],
"AllowedHeaders": ["Authorization"],
"ExposeHeaders": ["x-amz-request-id"],
"MaxAgeSeconds": 123,
},
{
"AllowedOrigins": ["*"],
"AllowedMethods": ["PUT"],
"AllowedHeaders": ["Authorization"],
"ExposeHeaders": ["x-amz-request-id"],
"MaxAgeSeconds": 123,
},
]
},
)
resp = s3_client.get_bucket_cors(Bucket=bucket_name)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
assert len(resp["CORSRules"]) == 2
@mock_s3
def test_delete_bucket_cors():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_cors(
Bucket=bucket_name,
CORSConfiguration={
"CORSRules": [{"AllowedOrigins": ["*"], "AllowedMethods": ["GET"]}]
},
)
resp = s3_client.delete_bucket_cors(Bucket=bucket_name)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 204
# Verify deletion:
with pytest.raises(ClientError) as err:
s3_client.get_bucket_cors(Bucket=bucket_name)
err_value = err.value
assert err_value.response["Error"]["Code"] == "NoSuchCORSConfiguration"
assert (
err_value.response["Error"]["Message"]
== "The CORS configuration does not exist"
)
@mock_s3
def test_put_bucket_notification():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="bucket")
# With no configuration:
result = s3_client.get_bucket_notification(Bucket="bucket")
assert not result.get("TopicConfigurations")
assert not result.get("QueueConfigurations")
assert not result.get("LambdaFunctionConfigurations")
# Place proper topic configuration:
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"TopicArn": "arn:aws:sns:us-east-1:012345678910:mytopic",
"Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"],
},
{
"TopicArn": "arn:aws:sns:us-east-1:012345678910:myothertopic",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": "images/"},
{"Name": "suffix", "Value": "png"},
]
}
},
},
]
},
)
# Verify to completion:
result = s3_client.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["TopicConfigurations"]) == 2
assert not result.get("QueueConfigurations")
assert not result.get("LambdaFunctionConfigurations")
assert (
result["TopicConfigurations"][0]["TopicArn"]
== "arn:aws:sns:us-east-1:012345678910:mytopic"
)
assert (
result["TopicConfigurations"][1]["TopicArn"]
== "arn:aws:sns:us-east-1:012345678910:myothertopic"
)
assert len(result["TopicConfigurations"][0]["Events"]) == 2
assert len(result["TopicConfigurations"][1]["Events"]) == 1
assert result["TopicConfigurations"][0]["Events"][0] == "s3:ObjectCreated:*"
assert result["TopicConfigurations"][0]["Events"][1] == "s3:ObjectRemoved:*"
assert result["TopicConfigurations"][1]["Events"][0] == "s3:ObjectCreated:*"
assert result["TopicConfigurations"][0]["Id"]
assert result["TopicConfigurations"][1]["Id"]
assert not result["TopicConfigurations"][0].get("Filter")
assert len(result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"]) == 2
assert (
result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][0]["Name"]
== "prefix"
)
assert (
result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][0]["Value"]
== "images/"
)
assert (
result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][1]["Name"]
== "suffix"
)
assert (
result["TopicConfigurations"][1]["Filter"]["Key"]["FilterRules"][1]["Value"]
== "png"
)
# Place proper queue configuration:
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"QueueConfigurations": [
{
"Id": "SomeID",
"QueueArn": "arn:aws:sqs:us-east-1:012345678910:myQueue",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "images/"}]}
},
}
]
},
)
result = s3_client.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["QueueConfigurations"]) == 1
assert not result.get("TopicConfigurations")
assert not result.get("LambdaFunctionConfigurations")
assert result["QueueConfigurations"][0]["Id"] == "SomeID"
assert (
result["QueueConfigurations"][0]["QueueArn"]
== "arn:aws:sqs:us-east-1:012345678910:myQueue"
)
assert result["QueueConfigurations"][0]["Events"][0] == "s3:ObjectCreated:*"
assert len(result["QueueConfigurations"][0]["Events"]) == 1
assert len(result["QueueConfigurations"][0]["Filter"]["Key"]["FilterRules"]) == 1
assert (
result["QueueConfigurations"][0]["Filter"]["Key"]["FilterRules"][0]["Name"]
== "prefix"
)
assert (
result["QueueConfigurations"][0]["Filter"]["Key"]["FilterRules"][0]["Value"]
== "images/"
)
# Place proper Lambda configuration:
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:012345678910:function:lambda",
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "images/"}]}
},
}
]
},
)
result = s3_client.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["LambdaFunctionConfigurations"]) == 1
assert not result.get("TopicConfigurations")
assert not result.get("QueueConfigurations")
assert result["LambdaFunctionConfigurations"][0]["Id"]
assert (
result["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"]
== "arn:aws:lambda:us-east-1:012345678910:function:lambda"
)
assert (
result["LambdaFunctionConfigurations"][0]["Events"][0] == "s3:ObjectCreated:*"
)
assert len(result["LambdaFunctionConfigurations"][0]["Events"]) == 1
assert (
len(result["LambdaFunctionConfigurations"][0]["Filter"]["Key"]["FilterRules"])
== 1
)
assert (
result["LambdaFunctionConfigurations"][0]["Filter"]["Key"]["FilterRules"][0][
"Name"
]
== "prefix"
)
assert (
result["LambdaFunctionConfigurations"][0]["Filter"]["Key"]["FilterRules"][0][
"Value"
]
== "images/"
)
# And with all 3 set:
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"TopicArn": "arn:aws:sns:us-east-1:012345678910:mytopic",
"Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"],
}
],
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:012345678910:function:lambda",
"Events": ["s3:ObjectCreated:*"],
}
],
"QueueConfigurations": [
{
"QueueArn": "arn:aws:sqs:us-east-1:012345678910:myQueue",
"Events": ["s3:ObjectCreated:*"],
}
],
},
)
result = s3_client.get_bucket_notification_configuration(Bucket="bucket")
assert len(result["LambdaFunctionConfigurations"]) == 1
assert len(result["TopicConfigurations"]) == 1
assert len(result["QueueConfigurations"]) == 1
# And clear it out:
s3_client.put_bucket_notification_configuration(
Bucket="bucket", NotificationConfiguration={}
)
result = s3_client.get_bucket_notification_configuration(Bucket="bucket")
assert not result.get("TopicConfigurations")
assert not result.get("QueueConfigurations")
assert not result.get("LambdaFunctionConfigurations")
@mock_s3
def test_put_bucket_notification_errors():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="bucket")
# With incorrect ARNs:
for tech in ["Queue", "Topic", "LambdaFunction"]:
with pytest.raises(ClientError) as err:
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
f"{tech}Configurations": [
{
f"{tech}Arn": "arn:aws:{}:us-east-1:012345678910:lksajdfkldskfj",
"Events": ["s3:ObjectCreated:*"],
}
]
},
)
assert err.value.response["Error"]["Code"] == "InvalidArgument"
assert err.value.response["Error"]["Message"] == "The ARN is not well formed"
# Region not the same as the bucket:
with pytest.raises(ClientError) as err:
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"QueueConfigurations": [
{
"QueueArn": "arn:aws:sqs:us-west-2:012345678910:lksajdfkldskfj",
"Events": ["s3:ObjectCreated:*"],
}
]
},
)
assert err.value.response["Error"]["Code"] == "InvalidArgument"
assert err.value.response["Error"]["Message"] == (
"The notification destination service region is not valid for "
"the bucket location constraint"
)
# Invalid event name:
with pytest.raises(ClientError) as err:
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"QueueConfigurations": [
{
"QueueArn": "arn:aws:sqs:us-east-1:012345678910:lksajdfkldskfj",
"Events": ["notarealeventname"],
}
]
},
)
assert err.value.response["Error"]["Code"] == "InvalidArgument"
assert (
err.value.response["Error"]["Message"]
== "The event is not supported for notifications"
)
@mock_s3
def test_list_object_versions():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "000" + str(uuid4())
key = "key-with-versions"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_versioning(
Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}
)
items = (b"v1", b"v2")
for body in items:
s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
response = s3_client.list_object_versions(Bucket=bucket_name)
# Two object versions should be returned
assert len(response["Versions"]) == 2
keys = {item["Key"] for item in response["Versions"]}
assert keys == {key}
# the first item in the list should be the latest
assert response["Versions"][0]["IsLatest"] is True
# Test latest object version is returned
response = s3_client.get_object(Bucket=bucket_name, Key=key)
assert response["Body"].read() == items[-1]
@mock_s3
def test_list_object_versions_with_delimiter():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "000" + str(uuid4())
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_versioning(
Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}
)
for key_index in list(range(1, 5)) + list(range(10, 14)):
for version_index in range(1, 4):
body = f"data-{version_index}".encode("UTF-8")
s3_client.put_object(
Bucket=bucket_name, Key=f"key{key_index}-with-data", Body=body
)
s3_client.put_object(
Bucket=bucket_name, Key=f"key{key_index}-without-data", Body=b""
)
response = s3_client.list_object_versions(Bucket=bucket_name)
# All object versions should be returned
# 8 keys * 2 (one with, one without) * 3 versions per key
assert len(response["Versions"]) == 48
# Use start of key as delimiter
response = s3_client.list_object_versions(Bucket=bucket_name, Delimiter="key1")
assert response["CommonPrefixes"] == [{"Prefix": "key1"}]
assert response["Delimiter"] == "key1"
# 3 keys that do not contain the phrase 'key1' (key2, key3, key4) * * 2 * 3
assert len(response["Versions"]) == 18
# Use in-between key as delimiter
response = s3_client.list_object_versions(Bucket=bucket_name, Delimiter="-with-")
assert response["CommonPrefixes"] == [
{"Prefix": "key1-with-"},
{"Prefix": "key10-with-"},
{"Prefix": "key11-with-"},
{"Prefix": "key12-with-"},
{"Prefix": "key13-with-"},
{"Prefix": "key2-with-"},
{"Prefix": "key3-with-"},
{"Prefix": "key4-with-"},
]
assert response["Delimiter"] == "-with-"
# key(1/10/11/12/13)-without, key(2/3/4)-without
assert len(response["Versions"]) == (8 * 1 * 3)
# Use in-between key as delimiter
response = s3_client.list_object_versions(Bucket=bucket_name, Delimiter="1-with-")
assert response["CommonPrefixes"] == (
[{"Prefix": "key1-with-"}, {"Prefix": "key11-with-"}]
)
assert response["Delimiter"] == "1-with-"
assert len(response["Versions"]) == 42
all_keys = {v["Key"] for v in response["Versions"]}
assert "key1-without-data" in all_keys
assert "key1-with-data" not in all_keys
assert "key4-with-data" in all_keys
assert "key4-without-data" in all_keys
# Use in-between key as delimiter + prefix
response = s3_client.list_object_versions(
Bucket=bucket_name, Prefix="key1", Delimiter="with-"
)
assert response["CommonPrefixes"] == [
{"Prefix": "key1-with-"},
{"Prefix": "key10-with-"},
{"Prefix": "key11-with-"},
{"Prefix": "key12-with-"},
{"Prefix": "key13-with-"},
]
assert response["Delimiter"] == "with-"
assert response["KeyMarker"] == ""
assert "NextKeyMarker" not in response
assert len(response["Versions"]) == 15
all_keys = {v["Key"] for v in response["Versions"]}
assert all_keys == {
"key1-without-data",
"key10-without-data",
"key11-without-data",
"key13-without-data",
"key12-without-data",
}
# Start at KeyMarker, and filter using Prefix+Delimiter for all subsequent keys
response = s3_client.list_object_versions(
Bucket=bucket_name, Prefix="key1", Delimiter="with-", KeyMarker="key11"
)
assert response["CommonPrefixes"] == [
{"Prefix": "key11-with-"},
{"Prefix": "key12-with-"},
{"Prefix": "key13-with-"},
]
assert response["Delimiter"] == "with-"
assert response["KeyMarker"] == "key11"
assert "NextKeyMarker" not in response
assert len(response["Versions"]) == 9
all_keys = {v["Key"] for v in response["Versions"]}
assert all_keys == (
{"key11-without-data", "key12-without-data", "key13-without-data"}
)
# Delimiter with Prefix being the entire key
response = s3_client.list_object_versions(
Bucket=bucket_name, Prefix="key1-with-data", Delimiter="-"
)
assert len(response["Versions"]) == 3
assert "CommonPrefixes" not in response
# Delimiter without prefix
response = s3_client.list_object_versions(Bucket=bucket_name, Delimiter="-with-")
assert len(response["CommonPrefixes"]) == 8
assert {"Prefix": "key1-with-"} in response["CommonPrefixes"]
# Should return all keys -without-data
assert len(response["Versions"]) == 24
@mock_s3
def test_list_object_versions_with_delimiter_for_deleted_objects():
bucket_name = "tests_bucket"
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
# Create bucket with versioning
client.create_bucket(Bucket=bucket_name)
client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={"MFADelete": "Disabled", "Status": "Enabled"},
)
# Create a history of objects
for pos in range(2):
client.put_object(
Bucket=bucket_name, Key=f"obj_{pos}", Body=f"object {pos}".encode("utf-8")
)
for pos in range(2):
client.put_object(
Bucket=bucket_name,
Key=f"hist_obj_{pos}",
Body=f"history object {pos}".encode("utf-8"),
)
for hist_pos in range(2):
client.put_object(
Bucket=bucket_name,
Key=f"hist_obj_{pos}",
Body=f"object {pos} {hist_pos}".encode("utf-8"),
)
for pos in range(2):
client.put_object(
Bucket=bucket_name,
Key=f"del_obj_{pos}",
Body=f"deleted object {pos}".encode("utf-8"),
)
client.delete_object(Bucket=bucket_name, Key=f"del_obj_{pos}")
# Verify we only retrieve the DeleteMarkers that have this prefix
objs = client.list_object_versions(Bucket=bucket_name)
assert [dm["Key"] for dm in objs["DeleteMarkers"]] == ["del_obj_0", "del_obj_1"]
hist_objs = client.list_object_versions(Bucket=bucket_name, Prefix="hist_obj")
assert "DeleteMarkers" not in hist_objs
del_objs = client.list_object_versions(Bucket=bucket_name, Prefix="del_obj_0")
assert [dm["Key"] for dm in del_objs["DeleteMarkers"]] == ["del_obj_0"]
@mock_s3
def test_list_object_versions_with_versioning_disabled():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
key = "key-with-versions"
s3_client.create_bucket(Bucket=bucket_name)
items = (b"v1", b"v2")
for body in items:
s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
response = s3_client.list_object_versions(Bucket=bucket_name)
# One object version should be returned
assert len(response["Versions"]) == 1
assert response["Versions"][0]["Key"] == key
# The version id should be the string null
assert response["Versions"][0]["VersionId"] == "null"
# Test latest object version is returned
response = s3_client.get_object(Bucket=bucket_name, Key=key)
assert "VersionId" not in response["ResponseMetadata"]["HTTPHeaders"]
assert response["Body"].read() == items[-1]
@mock_s3
def test_list_object_versions_with_versioning_enabled_late():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
key = "key-with-versions"
s3_client.create_bucket(Bucket=bucket_name)
items = (b"v1", b"v2")
s3_client.put_object(Bucket=bucket_name, Key=key, Body=b"v1")
s3_client.put_bucket_versioning(
Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}
)
s3_client.put_object(Bucket=bucket_name, Key=key, Body=b"v2")
response = s3_client.list_object_versions(Bucket=bucket_name)
# Two object versions should be returned
assert len(response["Versions"]) == 2
keys = {item["Key"] for item in response["Versions"]}
assert keys == {key}
# There should still be a null version id.
versions_id = {item["VersionId"] for item in response["Versions"]}
assert "null" in versions_id
# Test latest object version is returned
response = s3_client.get_object(Bucket=bucket_name, Key=key)
assert response["Body"].read() == items[-1]
@mock_s3
def test_bad_prefix_list_object_versions():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
key = "key-with-versions"
bad_prefix = "key-that-does-not-exist"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_versioning(
Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}
)
items = (b"v1", b"v2")
for body in items:
s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
response = s3_client.list_object_versions(Bucket=bucket_name, Prefix=bad_prefix)
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
assert "Versions" not in response
assert "DeleteMarkers" not in response
@mock_s3
def test_delete_markers():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
key = "key-with-versions-and-unicode-ó"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_versioning(
Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}
)
items = (b"v1", b"v2")
for body in items:
s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
s3_client.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": key}]})
with pytest.raises(ClientError) as exc:
s3_client.get_object(Bucket=bucket_name, Key=key)
assert exc.value.response["Error"]["Code"] == "NoSuchKey"
response = s3_client.list_object_versions(Bucket=bucket_name)
assert len(response["Versions"]) == 2
assert len(response["DeleteMarkers"]) == 1
s3_client.delete_object(
Bucket=bucket_name, Key=key, VersionId=response["DeleteMarkers"][0]["VersionId"]
)
response = s3_client.get_object(Bucket=bucket_name, Key=key)
assert response["Body"].read() == items[-1]
response = s3_client.list_object_versions(Bucket=bucket_name)
assert len(response["Versions"]) == 2
# We've asserted there is only 2 records so one is newest, one is oldest
latest = list(filter(lambda item: item["IsLatest"], response["Versions"]))[0]
oldest = list(filter(lambda item: not item["IsLatest"], response["Versions"]))[0]
# Double check ordering of version ID's
assert latest["VersionId"] != oldest["VersionId"]
# Double check the name is still unicode
assert latest["Key"] == "key-with-versions-and-unicode-ó"
assert oldest["Key"] == "key-with-versions-and-unicode-ó"
@mock_s3
def test_multiple_delete_markers():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
key = "key-with-versions-and-unicode-ó"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_versioning(
Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}
)
items = (b"v1", b"v2")
for body in items:
s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
# Delete the object twice to add multiple delete markers
s3_client.delete_object(Bucket=bucket_name, Key=key)
s3_client.delete_object(Bucket=bucket_name, Key=key)
response = s3_client.list_object_versions(Bucket=bucket_name)
assert len(response["DeleteMarkers"]) == 2
with pytest.raises(ClientError) as exc:
s3_client.get_object(Bucket=bucket_name, Key=key)
assert exc.response["Error"]["Code"] == "404"
# Remove both delete markers to restore the object
s3_client.delete_object(
Bucket=bucket_name, Key=key, VersionId=response["DeleteMarkers"][0]["VersionId"]
)
s3_client.delete_object(
Bucket=bucket_name, Key=key, VersionId=response["DeleteMarkers"][1]["VersionId"]
)
response = s3_client.get_object(Bucket=bucket_name, Key=key)
assert response["Body"].read() == items[-1]
response = s3_client.list_object_versions(Bucket=bucket_name)
assert len(response["Versions"]) == 2
# We've asserted there is only 2 records so one is newest, one is oldest
latest = list(filter(lambda item: item["IsLatest"], response["Versions"]))[0]
oldest = list(filter(lambda item: not item["IsLatest"], response["Versions"]))[0]
# Double check ordering of version ID's
assert latest["VersionId"] != oldest["VersionId"]
# Double check the name is still unicode
assert latest["Key"] == "key-with-versions-and-unicode-ó"
assert oldest["Key"] == "key-with-versions-and-unicode-ó"
@mock_s3
def test_get_stream_gzipped():
payload = b"this is some stuff here"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="moto-tests")
buffer_ = BytesIO()
with GzipFile(fileobj=buffer_, mode="w") as fhandle:
fhandle.write(payload)
payload_gz = buffer_.getvalue()
s3_client.put_object(
Bucket="moto-tests", Key="keyname", Body=payload_gz, ContentEncoding="gzip"
)
obj = s3_client.get_object(Bucket="moto-tests", Key="keyname")
res = zlib.decompress(obj["Body"].read(), 16 + zlib.MAX_WBITS)
assert res == payload
TEST_XML = """\
<?xml version="1.0" encoding="UTF-8"?>
<ns0:WebsiteConfiguration xmlns:ns0="http://s3.amazonaws.com/doc/2006-03-01/">
<ns0:IndexDocument>
<ns0:Suffix>index.html</ns0:Suffix>
</ns0:IndexDocument>
<ns0:RoutingRules>
<ns0:RoutingRule>
<ns0:Condition>
<ns0:KeyPrefixEquals>test/testing</ns0:KeyPrefixEquals>
</ns0:Condition>
<ns0:Redirect>
<ns0:ReplaceKeyWith>test.txt</ns0:ReplaceKeyWith>
</ns0:Redirect>
</ns0:RoutingRule>
</ns0:RoutingRules>
</ns0:WebsiteConfiguration>
"""
@mock_s3
def test_bucket_name_too_long():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
with pytest.raises(ClientError) as exc:
s3_client.create_bucket(Bucket="x" * 64)
assert exc.value.response["Error"]["Code"] == "InvalidBucketName"
@mock_s3
def test_bucket_name_too_short():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
with pytest.raises(ClientError) as exc:
s3_client.create_bucket(Bucket="x" * 2)
assert exc.value.response["Error"]["Code"] == "InvalidBucketName"
@mock_s3
def test_accelerated_none_when_unspecified():
bucket_name = "some_bucket"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
resp = s3_client.get_bucket_accelerate_configuration(Bucket=bucket_name)
assert "Status" not in resp
@mock_s3
def test_can_enable_bucket_acceleration():
bucket_name = "some_bucket"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
resp = s3_client.put_bucket_accelerate_configuration(
Bucket=bucket_name, AccelerateConfiguration={"Status": "Enabled"}
)
assert len(resp.keys()) == 1 # Response contains nothing (only HTTP headers)
resp = s3_client.get_bucket_accelerate_configuration(Bucket=bucket_name)
assert "Status" in resp
assert resp["Status"] == "Enabled"
@mock_s3
def test_can_suspend_bucket_acceleration():
bucket_name = "some_bucket"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
resp = s3_client.put_bucket_accelerate_configuration(
Bucket=bucket_name, AccelerateConfiguration={"Status": "Enabled"}
)
resp = s3_client.put_bucket_accelerate_configuration(
Bucket=bucket_name, AccelerateConfiguration={"Status": "Suspended"}
)
assert len(resp.keys()) == 1 # Response contains nothing (only HTTP headers)
resp = s3_client.get_bucket_accelerate_configuration(Bucket=bucket_name)
assert "Status" in resp
assert resp["Status"] == "Suspended"
@mock_s3
def test_suspending_acceleration_on_not_configured_bucket_does_nothing():
bucket_name = "some_bucket"
s3_client = boto3.client("s3")
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
)
resp = s3_client.put_bucket_accelerate_configuration(
Bucket=bucket_name, AccelerateConfiguration={"Status": "Suspended"}
)
assert len(resp.keys()) == 1 # Response contains nothing (only HTTP headers)
resp = s3_client.get_bucket_accelerate_configuration(Bucket=bucket_name)
assert "Status" not in resp
@mock_s3
def test_accelerate_configuration_status_validation():
bucket_name = "some_bucket"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as exc:
s3_client.put_bucket_accelerate_configuration(
Bucket=bucket_name, AccelerateConfiguration={"Status": "bad_status"}
)
assert exc.value.response["Error"]["Code"] == "MalformedXML"
@mock_s3
def test_accelerate_configuration_is_not_supported_when_bucket_name_has_dots():
bucket_name = "some.bucket.with.dots"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as exc:
s3_client.put_bucket_accelerate_configuration(
Bucket=bucket_name, AccelerateConfiguration={"Status": "Enabled"}
)
assert exc.value.response["Error"]["Code"] == "InvalidRequest"
def store_and_read_back_a_key(key):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
body = b"Some body"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
response = s3_client.get_object(Bucket=bucket_name, Key=key)
assert response["Body"].read() == body
@mock_s3
def test_paths_with_leading_slashes_work():
store_and_read_back_a_key("/a-key")
@mock_s3
def test_root_dir_with_empty_name_works():
if settings.TEST_SERVER_MODE:
raise SkipTest("Does not work in server mode due to error in Workzeug")
store_and_read_back_a_key("/")
@pytest.mark.parametrize("bucket_name", ["mybucket", "my.bucket"])
@mock_s3
def test_leading_slashes_not_removed(bucket_name):
"""Make sure that leading slashes are not removed internally."""
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
uploaded_key = "/key"
invalid_key_1 = "key"
invalid_key_2 = "//key"
s3_client.put_object(Bucket=bucket_name, Key=uploaded_key, Body=b"Some body")
with pytest.raises(ClientError) as exc:
s3_client.get_object(Bucket=bucket_name, Key=invalid_key_1)
assert exc.value.response["Error"]["Code"] == "NoSuchKey"
with pytest.raises(ClientError) as exc:
s3_client.get_object(Bucket=bucket_name, Key=invalid_key_2)
assert exc.value.response["Error"]["Code"] == "NoSuchKey"
@pytest.mark.parametrize(
"key", ["foo/bar/baz", "foo", "foo/run_dt%3D2019-01-01%252012%253A30%253A00"]
)
@mock_s3
def test_delete_objects_with_url_encoded_key(key):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
body = b"Some body"
s3_client.create_bucket(Bucket=bucket_name)
def put_object():
s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
def assert_deleted():
with pytest.raises(ClientError) as exc:
s3_client.get_object(Bucket=bucket_name, Key=key)
assert exc.value.response["Error"]["Code"] == "NoSuchKey"
put_object()
s3_client.delete_object(Bucket=bucket_name, Key=key)
assert_deleted()
put_object()
s3_client.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": key}]})
assert_deleted()
@mock_s3
def test_delete_objects_unknown_key():
bucket_name = "test-moto-issue-1581"
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket=bucket_name)
client.put_object(Bucket=bucket_name, Key="file1", Body="body")
objs = client.delete_objects(
Bucket=bucket_name, Delete={"Objects": [{"Key": "file1"}, {"Key": "file2"}]}
)
assert len(objs["Deleted"]) == 2
assert {"Key": "file1"} in objs["Deleted"]
assert {"Key": "file2"} in objs["Deleted"]
client.delete_bucket(Bucket=bucket_name)
@mock_s3
@mock_config
def test_public_access_block():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="mybucket")
# Try to get the public access block (should not exist by default)
with pytest.raises(ClientError) as exc:
client.get_public_access_block(Bucket="mybucket")
assert exc.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
assert (
exc.value.response["Error"]["Message"]
== "The public access block configuration was not found"
)
assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404
# Put a public block in place:
test_map = {
"BlockPublicAcls": False,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
for field in test_map:
# Toggle:
test_map[field] = True
client.put_public_access_block(
Bucket="mybucket", PublicAccessBlockConfiguration=test_map
)
# Test:
assert (
test_map
== client.get_public_access_block(Bucket="mybucket")[
"PublicAccessBlockConfiguration"
]
)
# Assume missing values are default False:
client.put_public_access_block(
Bucket="mybucket", PublicAccessBlockConfiguration={"BlockPublicAcls": True}
)
assert client.get_public_access_block(Bucket="mybucket")[
"PublicAccessBlockConfiguration"
] == {
"BlockPublicAcls": True,
"IgnorePublicAcls": False,
"BlockPublicPolicy": False,
"RestrictPublicBuckets": False,
}
# Test with a blank PublicAccessBlockConfiguration:
with pytest.raises(ClientError) as exc:
client.put_public_access_block(
Bucket="mybucket", PublicAccessBlockConfiguration={}
)
assert exc.value.response["Error"]["Code"] == "InvalidRequest"
assert (
exc.value.response["Error"]["Message"]
== "Must specify at least one configuration."
)
assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400
# Test that things work with AWS Config:
config_client = boto3.client("config", region_name=DEFAULT_REGION_NAME)
result = config_client.get_resource_config_history(
resourceType="AWS::S3::Bucket", resourceId="mybucket"
)
pub_block_config = json.loads(
result["configurationItems"][0]["supplementaryConfiguration"][
"PublicAccessBlockConfiguration"
]
)
assert pub_block_config == {
"blockPublicAcls": True,
"ignorePublicAcls": False,
"blockPublicPolicy": False,
"restrictPublicBuckets": False,
}
# Delete:
client.delete_public_access_block(Bucket="mybucket")
with pytest.raises(ClientError) as exc:
client.get_public_access_block(Bucket="mybucket")
assert exc.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
@mock_s3
def test_creating_presigned_post():
bucket = "presigned-test"
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.create_bucket(Bucket=bucket)
success_url = "http://localhost/completed"
fdata = b"test data\n"
file_uid = uuid.uuid4()
conditions = [
{"Content-Type": "text/plain"},
{"x-amz-server-side-encryption": "AES256"},
{"success_action_redirect": success_url},
]
conditions.append(["content-length-range", 1, 30])
real_key = f"{file_uid}.txt"
data = s3_client.generate_presigned_post(
Bucket=bucket,
Key=real_key,
Fields={
"content-type": "text/plain",
"success_action_redirect": success_url,
"x-amz-server-side-encryption": "AES256",
},
Conditions=conditions,
ExpiresIn=1000,
)
resp = requests.post(
data["url"], data=data["fields"], files={"file": fdata}, allow_redirects=False
)
assert resp.status_code == 303
redirect = resp.headers["Location"]
assert redirect.startswith(success_url)
parts = urlparse(redirect)
args = parse_qs(parts.query)
assert args["key"][0] == real_key
assert args["bucket"][0] == bucket
assert s3_client.get_object(Bucket=bucket, Key=real_key)["Body"].read() == fdata
@mock_s3
def test_presigned_put_url_with_approved_headers():
bucket = str(uuid.uuid4())
key = "file.txt"
content = b"filecontent"
expected_contenttype = "app/sth"
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=bucket)
s3_client = boto3.client("s3", region_name="us-east-1")
# Create a pre-signed url with some metadata.
url = s3_client.generate_presigned_url(
ClientMethod="put_object",
Params={"Bucket": bucket, "Key": key, "ContentType": expected_contenttype},
)
# Verify S3 throws an error when the header is not provided
response = requests.put(url, data=content)
assert response.status_code == 403
assert "<Code>SignatureDoesNotMatch</Code>" in str(response.content)
assert (
"<Message>The request signature we calculated does not match the "
"signature you provided. Check your key and signing method.</Message>"
) in str(response.content)
# Verify S3 throws an error when the header has the wrong value
response = requests.put(
url, data=content, headers={"Content-Type": "application/unknown"}
)
assert response.status_code == 403
assert "<Code>SignatureDoesNotMatch</Code>" in str(response.content)
assert (
"<Message>The request signature we calculated does not match the "
"signature you provided. Check your key and signing method.</Message>"
) in str(response.content)
# Verify S3 uploads correctly when providing the meta data
response = requests.put(
url, data=content, headers={"Content-Type": expected_contenttype}
)
assert response.status_code == 200
# Assert the object exists
obj = s3_client.get_object(Bucket=bucket, Key=key)
assert obj["ContentType"] == expected_contenttype
assert obj["ContentLength"] == 11
assert obj["Body"].read() == content
assert obj["Metadata"] == {}
s3_client.delete_object(Bucket=bucket, Key=key)
s3_client.delete_bucket(Bucket=bucket)
@mock_s3
def test_presigned_put_url_with_custom_headers():
bucket = str(uuid.uuid4())
key = "file.txt"
content = b"filecontent"
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=bucket)
s3_client = boto3.client("s3", region_name="us-east-1")
# Create a pre-signed url with some metadata.
url = s3_client.generate_presigned_url(
ClientMethod="put_object",
Params={"Bucket": bucket, "Key": key, "Metadata": {"venue": "123"}},
)
# Verify S3 uploads correctly when providing the meta data
response = requests.put(url, data=content)
assert response.status_code == 200
# Assert the object exists
obj = s3_client.get_object(Bucket=bucket, Key=key)
assert obj["ContentLength"] == 11
assert obj["Body"].read() == content
assert obj["Metadata"] == {"venue": "123"}
s3_client.delete_object(Bucket=bucket, Key=key)
s3_client.delete_bucket(Bucket=bucket)
@mock_s3
def test_request_partial_content_should_contain_content_length():
bucket = "bucket"
object_key = "key"
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket=bucket)
s3_resource.Object(bucket, object_key).put(Body="some text")
file = s3_resource.Object(bucket, object_key)
response = file.get(Range="bytes=0-1024")
assert response["ContentLength"] == 9
@mock_s3
def test_request_partial_content_should_contain_actual_content_length():
bucket = "bucket"
object_key = "key"
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket=bucket)
s3_resource.Object(bucket, object_key).put(Body="some text")
file = s3_resource.Object(bucket, object_key)
requested_range = "bytes=1024-"
try:
file.get(Range=requested_range)
except botocore.client.ClientError as exc:
assert exc.response["Error"]["Code"] == "InvalidRange"
assert (
exc.response["Error"]["Message"] == "The requested range is not satisfiable"
)
assert exc.response["Error"]["ActualObjectSize"] == "9"
assert exc.response["Error"]["RangeRequested"] == requested_range
@mock_s3
def test_get_unknown_version_should_throw_specific_error():
bucket_name = "my_bucket"
object_key = "hello.txt"
s3_resource = boto3.resource("s3", region_name="us-east-1")
client = boto3.client("s3", region_name="us-east-1")
bucket = s3_resource.create_bucket(Bucket=bucket_name)
bucket.Versioning().enable()
content = "some text"
s3_resource.Object(bucket_name, object_key).put(Body=content)
with pytest.raises(ClientError) as exc:
client.get_object(Bucket=bucket_name, Key=object_key, VersionId="unknown")
assert exc.value.response["Error"]["Code"] == "NoSuchVersion"
assert exc.value.response["Error"]["Message"] == (
"The specified version does not exist."
)
@mock_s3
def test_request_partial_content_without_specifying_range_should_return_full_object():
bucket = "bucket"
object_key = "key"
s3_resource = boto3.resource("s3", region_name="us-east-1")
s3_resource.create_bucket(Bucket=bucket)
s3_resource.Object(bucket, object_key).put(Body="some text that goes a long way")
file = s3_resource.Object(bucket, object_key)
response = file.get(Range="")
assert response["ContentLength"] == 30
@mock_s3
def test_object_headers():
bucket = "my-bucket"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket)
res = s3_client.put_object(
Bucket=bucket,
Body=b"test",
Key="file.txt",
ServerSideEncryption="aws:kms",
SSEKMSKeyId="test",
BucketKeyEnabled=True,
)
assert "ETag" in res
assert "ServerSideEncryption" in res
assert "SSEKMSKeyId" in res
assert "BucketKeyEnabled" in res
res = s3_client.get_object(Bucket=bucket, Key="file.txt")
assert "ETag" in res
assert "ServerSideEncryption" in res
assert "SSEKMSKeyId" in res
assert "BucketKeyEnabled" in res
if settings.TEST_SERVER_MODE:
@mock_s3
def test_upload_data_without_content_type():
bucket = "mybucket"
s3_client = boto3.client("s3")
s3_client.create_bucket(Bucket=bucket)
data_input = b"some data 123 321"
req = requests.put("http://localhost:5000/mybucket/test.txt", data=data_input)
assert req.status_code == 200
res = s3_client.get_object(Bucket=bucket, Key="test.txt")
data = res["Body"].read()
assert data == data_input
@mock_s3
@pytest.mark.parametrize(
"prefix", ["file", "file+else", "file&another", "file another"]
)
def test_get_object_versions_with_prefix(prefix):
bucket_name = "testbucket-3113"
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
bucket_versioning = s3_resource.BucketVersioning(bucket_name)
bucket_versioning.enable()
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key=f"{prefix}.txt")
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key=f"{prefix}.txt")
s3_client.put_object(Bucket=bucket_name, Body=b"alttest", Key=f"alt{prefix}.txt")
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key=f"{prefix}.txt")
versions = s3_client.list_object_versions(Bucket=bucket_name, Prefix=prefix)
assert len(versions["Versions"]) == 3
assert versions["Prefix"] == prefix
@mock_s3
def test_create_bucket_duplicate():
bucket_name = "same-bucket-test-1371"
alternate_region = "eu-north-1"
# Create it in the default region
default_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
default_client.create_bucket(Bucket=bucket_name)
# Create it again in the same region - should just return that same bucket
default_client.create_bucket(Bucket=bucket_name)
# Create the bucket in a different region - should return an error
diff_client = boto3.client("s3", region_name=alternate_region)
with pytest.raises(ClientError) as ex:
diff_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": alternate_region},
)
err = ex.value.response["Error"]
assert err["Code"] == "BucketAlreadyOwnedByYou"
assert err["Message"] == (
"Your previous request to create the named bucket succeeded and you already own it."
)
assert err["BucketName"] == bucket_name
# Try this again - but creating the bucket in a non-default region in the first place
bucket_name = "same-bucket-nondefault-region-test-1371"
diff_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": alternate_region},
)
# Recreating the bucket in the same non-default region should fail
with pytest.raises(ClientError) as ex:
diff_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": alternate_region},
)
err = ex.value.response["Error"]
assert err["Code"] == "BucketAlreadyOwnedByYou"
assert err["Message"] == (
"Your previous request to create the named bucket succeeded and you already own it."
)
assert err["BucketName"] == bucket_name
# Recreating the bucket in the default region should fail
diff_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
with pytest.raises(ClientError) as ex:
diff_client.create_bucket(Bucket=bucket_name)
err = ex.value.response["Error"]
assert err["Code"] == "BucketAlreadyOwnedByYou"
assert err["Message"] == (
"Your previous request to create the named bucket succeeded and you already own it."
)
assert err["BucketName"] == bucket_name
# Recreating the bucket in a third region should fail
diff_client = boto3.client("s3", region_name="ap-northeast-1")
with pytest.raises(ClientError) as ex:
diff_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": "ap-northeast-1"},
)
err = ex.value.response["Error"]
assert err["Code"] == "BucketAlreadyOwnedByYou"
assert err["Message"] == (
"Your previous request to create the named bucket succeeded and you already own it."
)
assert err["BucketName"] == bucket_name
@mock_s3
def test_delete_objects_with_empty_keyname():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "testbucket-4077"
bucket = resource.create_bucket(Bucket=bucket_name)
key_name = " "
bucket.put_object(Key=key_name, Body=b"")
assert len(client.list_objects(Bucket=bucket_name)["Contents"]) == 1
bucket.delete_objects(Delete={"Objects": [{"Key": key_name}]})
assert "Contents" not in client.list_objects(Bucket=bucket_name)
bucket.put_object(Key=key_name, Body=b"")
client.delete_object(Bucket=bucket_name, Key=key_name)
assert "Contents" not in client.list_objects(Bucket=bucket_name)
@mock_s3
def test_head_object_should_return_default_content_type():
s3_resource = boto3.resource("s3", region_name="us-east-1")
s3_resource.create_bucket(Bucket="testbucket")
s3_resource.Bucket("testbucket").upload_fileobj(
BytesIO(b"foobar"), Key="testobject"
)
s3_client = boto3.client("s3", region_name="us-east-1")
resp = s3_client.head_object(Bucket="testbucket", Key="testobject")
assert resp["ContentType"] == "binary/octet-stream"
assert resp["ResponseMetadata"]["HTTPHeaders"]["content-type"] == (
"binary/octet-stream"
)
assert (
s3_resource.Object("testbucket", "testobject").content_type
== "binary/octet-stream"
)
@mock_s3
def test_request_partial_content_should_contain_all_metadata():
# github.com/getmoto/moto/issues/4203
bucket = "bucket"
object_key = "key"
body = "some text"
query_range = "0-3"
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket=bucket)
obj = boto3.resource("s3").Object(bucket, object_key)
obj.put(Body=body)
response = obj.get(Range=f"bytes={query_range}")
assert response["ETag"] == obj.e_tag
assert response["LastModified"] == obj.last_modified
assert response["ContentLength"] == 4
assert response["ContentRange"] == f"bytes {query_range}/{len(body)}"
@mock_s3
def test_head_versioned_key_in_not_versioned_bucket():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="simple-bucked")
with pytest.raises(ClientError) as ex:
client.head_object(
Bucket="simple-bucked", Key="file.txt", VersionId="noVersion"
)
response = ex.value.response
assert response["Error"]["Code"] == "400"
@mock_s3
def test_prefix_encoding():
bucket_name = "encoding-bucket"
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket=bucket_name)
client.put_object(Bucket=bucket_name, Key="foo%2Fbar/data", Body=b"")
data = client.list_objects_v2(Bucket=bucket_name, Prefix="foo%2Fbar")
assert data["Contents"][0]["Key"].startswith(data["Prefix"])
data = client.list_objects_v2(Bucket=bucket_name, Prefix="foo%2Fbar", Delimiter="/")
assert data["CommonPrefixes"] == [{"Prefix": "foo%2Fbar/"}]
client.put_object(Bucket=bucket_name, Key="foo/bar/data", Body=b"")
data = client.list_objects_v2(Bucket=bucket_name, Delimiter="/")
folders = list(
map(lambda common_prefix: common_prefix["Prefix"], data["CommonPrefixes"])
)
assert ["foo%2Fbar/", "foo/"] == folders
@mock_s3
@pytest.mark.parametrize("algorithm", ["CRC32", "CRC32C", "SHA1", "SHA256"])
def test_checksum_response(algorithm):
bucket_name = "checksum-bucket"
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket=bucket_name)
if (
algorithm != "CRC32C"
): # awscrt is required to allow botocore checksum with CRC32C
response = client.put_object(
Bucket=bucket_name,
Key="test-key",
Body=b"data",
ChecksumAlgorithm=algorithm,
)
assert f"Checksum{algorithm}" in response
@mock_s3
def test_cross_account_region_access():
if settings.TEST_SERVER_MODE:
raise SkipTest("Multi-accounts env config only works serverside")
client1 = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client2 = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
account2 = "222222222222"
bucket_name = "cross-account-bucket"
key = "test-key"
# Create a bucket in the default account
client1.create_bucket(Bucket=bucket_name)
client1.put_object(Bucket=bucket_name, Key=key, Body=b"data")
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}):
# Ensure the bucket can be retrieved from another account
response = client2.list_objects(Bucket=bucket_name)
assert len(response["Contents"]) == 1
assert response["Contents"][0]["Key"] == key
assert client2.get_object(Bucket=bucket_name, Key=key)
assert client2.put_object(Bucket=bucket_name, Key=key, Body=b"kaytranada")
# Ensure bucket namespace is shared across accounts
with pytest.raises(ClientError) as exc:
client2.create_bucket(Bucket=bucket_name)
assert exc.value.response["Error"]["Code"] == "BucketAlreadyExists"
assert exc.value.response["Error"]["Message"] == (
"The requested bucket name is not available. The bucket "
"namespace is shared by all users of the system. Please "
"select a different name and try again"
)
# Ensure bucket name can be reused if it is deleted
client1.delete_object(Bucket=bucket_name, Key=key)
client1.delete_bucket(Bucket=bucket_name)
with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}):
assert client2.create_bucket(Bucket=bucket_name)