moto/tests/test_s3/test_s3_multipart.py

1063 lines
32 KiB
Python
Raw Normal View History

from functools import wraps
from io import BytesIO
import os
import re
2022-02-24 21:42:38 +00:00
import boto3
from botocore.client import ClientError
import pytest
import requests
from unittest import SkipTest
2022-02-24 21:42:38 +00:00
from moto import settings, mock_s3
import moto.s3.models as s3model
from moto.s3.responses import DEFAULT_REGION_NAME
2023-09-27 18:34:30 +00:00
from moto.settings import (
get_s3_default_key_buffer_size,
S3_UPLOAD_PART_MIN_SIZE,
test_proxy_mode,
)
from tests import DEFAULT_ACCOUNT_ID
2023-09-27 18:34:30 +00:00
from .test_s3 import add_proxy_details
2022-02-24 21:42:38 +00:00
if settings.TEST_DECORATOR_MODE:
2022-02-24 21:42:38 +00:00
REDUCED_PART_SIZE = 256
EXPECTED_ETAG = '"66d1a1a2ed08fd05c137f316af4ff255-2"'
else:
REDUCED_PART_SIZE = S3_UPLOAD_PART_MIN_SIZE
EXPECTED_ETAG = '"140f92a6df9f9e415f74a1463bcee9bb-2"'
2022-02-24 21:42:38 +00:00
def reduced_min_part_size(func):
"""Speed up tests by temporarily making multipart min. part size small."""
2022-02-24 21:42:38 +00:00
orig_size = S3_UPLOAD_PART_MIN_SIZE
@wraps(func)
2022-02-24 21:42:38 +00:00
def wrapped(*args, **kwargs):
try:
s3model.S3_UPLOAD_PART_MIN_SIZE = REDUCED_PART_SIZE
return func(*args, **kwargs)
2022-02-24 21:42:38 +00:00
finally:
s3model.S3_UPLOAD_PART_MIN_SIZE = orig_size
return wrapped
@mock_s3
def test_default_key_buffer_size():
# save original DEFAULT_KEY_BUFFER_SIZE environment variable content
original_default_key_buffer_size = os.environ.get(
"MOTO_S3_DEFAULT_KEY_BUFFER_SIZE", None
)
os.environ["MOTO_S3_DEFAULT_KEY_BUFFER_SIZE"] = "2" # 2 bytes
assert get_s3_default_key_buffer_size() == 2
fake_key = s3model.FakeKey("a", os.urandom(1), account_id=DEFAULT_ACCOUNT_ID)
assert fake_key._value_buffer._rolled is False
2022-02-24 21:42:38 +00:00
os.environ["MOTO_S3_DEFAULT_KEY_BUFFER_SIZE"] = "1" # 1 byte
assert get_s3_default_key_buffer_size() == 1
fake_key = s3model.FakeKey("a", os.urandom(3), account_id=DEFAULT_ACCOUNT_ID)
assert fake_key._value_buffer._rolled is True
2022-02-24 21:42:38 +00:00
# if no MOTO_S3_DEFAULT_KEY_BUFFER_SIZE env variable is present the
# buffer size should be less than S3_UPLOAD_PART_MIN_SIZE to prevent
# in memory caching of multi part uploads.
2022-02-24 21:42:38 +00:00
del os.environ["MOTO_S3_DEFAULT_KEY_BUFFER_SIZE"]
assert get_s3_default_key_buffer_size() < S3_UPLOAD_PART_MIN_SIZE
# restore original environment variable content
if original_default_key_buffer_size:
os.environ["MOTO_S3_DEFAULT_KEY_BUFFER_SIZE"] = original_default_key_buffer_size
@mock_s3
def test_multipart_upload_too_small():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
2022-02-24 21:42:38 +00:00
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
2022-02-24 21:42:38 +00:00
multipart = client.create_multipart_upload(Bucket="foobar", Key="the-key")
2022-02-24 21:42:38 +00:00
up1 = client.upload_part(
Body=BytesIO(b"hello"),
PartNumber=1,
Bucket="foobar",
Key="the-key",
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
up2 = client.upload_part(
Body=BytesIO(b"world"),
PartNumber=2,
Bucket="foobar",
Key="the-key",
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
# Multipart with total size under 5MB is refused
with pytest.raises(ClientError) as ex:
client.complete_multipart_upload(
Bucket="foobar",
Key="the-key",
MultipartUpload={
"Parts": [
{"ETag": up1["ETag"], "PartNumber": 1},
{"ETag": up2["ETag"], "PartNumber": 2},
]
},
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
assert ex.value.response["Error"]["Code"] == "EntityTooSmall"
assert ex.value.response["Error"]["Message"] == (
2022-02-24 21:42:38 +00:00
"Your proposed upload is smaller than the minimum allowed object size."
)
@pytest.mark.parametrize("key", ["the-key", "the%20key"])
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_upload(key: str):
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
2022-02-24 21:42:38 +00:00
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
part2 = b"1"
multipart = client.create_multipart_upload(Bucket="foobar", Key=key)
2022-02-24 21:42:38 +00:00
up1 = client.upload_part(
Body=BytesIO(part1),
PartNumber=1,
Bucket="foobar",
Key=key,
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
up2 = client.upload_part(
Body=BytesIO(part2),
PartNumber=2,
Bucket="foobar",
Key=key,
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
client.complete_multipart_upload(
Bucket="foobar",
Key=key,
2022-02-24 21:42:38 +00:00
MultipartUpload={
"Parts": [
{"ETag": up1["ETag"], "PartNumber": 1},
{"ETag": up2["ETag"], "PartNumber": 2},
]
},
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
# we should get both parts as the key contents
response = client.get_object(Bucket="foobar", Key=key)
assert response["Body"].read() == part1 + part2
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_upload_out_of_order():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
2022-02-24 21:42:38 +00:00
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
part2 = b"1"
multipart = client.create_multipart_upload(Bucket="foobar", Key="the-key")
2022-02-24 21:42:38 +00:00
up1 = client.upload_part(
Body=BytesIO(part1),
PartNumber=4,
Bucket="foobar",
Key="the-key",
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
up2 = client.upload_part(
Body=BytesIO(part2),
PartNumber=2,
Bucket="foobar",
Key="the-key",
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
client.complete_multipart_upload(
Bucket="foobar",
Key="the-key",
MultipartUpload={
"Parts": [
{"ETag": up1["ETag"], "PartNumber": 4},
{"ETag": up2["ETag"], "PartNumber": 2},
]
},
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
# we should get both parts as the key contents
response = client.get_object(Bucket="foobar", Key="the-key")
assert response["Body"].read() == part1 + part2
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_upload_with_headers():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
2022-02-24 21:42:38 +00:00
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
2022-06-23 09:56:21 +00:00
bucket_name = "fancymultiparttest"
key_name = "the-key"
s3_resource.create_bucket(Bucket=bucket_name)
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
multipart = client.create_multipart_upload(
2022-06-23 09:56:21 +00:00
Bucket=bucket_name,
Key=key_name,
Metadata={"meta": "data"},
StorageClass="STANDARD_IA",
ACL="authenticated-read",
2022-02-24 21:42:38 +00:00
)
up1 = client.upload_part(
Body=BytesIO(part1),
PartNumber=1,
2022-06-23 09:56:21 +00:00
Bucket=bucket_name,
Key=key_name,
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
client.complete_multipart_upload(
2022-06-23 09:56:21 +00:00
Bucket=bucket_name,
Key=key_name,
2022-02-24 21:42:38 +00:00
MultipartUpload={"Parts": [{"ETag": up1["ETag"], "PartNumber": 1}]},
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
# we should get both parts as the key contents
2022-06-23 09:56:21 +00:00
response = client.get_object(Bucket=bucket_name, Key=key_name)
assert response["Metadata"] == {"meta": "data"}
assert response["StorageClass"] == "STANDARD_IA"
2022-06-23 09:56:21 +00:00
grants = client.get_object_acl(Bucket=bucket_name, Key=key_name)["Grants"]
assert len(grants) == 2
assert {
"Grantee": {
"Type": "Group",
"URI": "http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
},
"Permission": "READ",
} in grants
2022-02-24 21:42:38 +00:00
@pytest.mark.parametrize(
"original_key_name",
[
"original-key",
"the-unicode-💩-key",
"key-with?question-mark",
"key-with%2Fembedded%2Furl%2Fencoding",
],
)
@mock_s3
@reduced_min_part_size
def test_multipart_upload_with_copy_key(original_key_name):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="foobar")
s3_client.put_object(Bucket="foobar", Key=original_key_name, Body="key_value")
2022-02-24 21:42:38 +00:00
mpu = s3_client.create_multipart_upload(Bucket="foobar", Key="the-key")
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
up1 = s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="foobar",
Key="the-key",
PartNumber=1,
UploadId=mpu["UploadId"],
Body=BytesIO(part1),
)
up2 = s3_client.upload_part_copy(
2022-02-24 21:42:38 +00:00
Bucket="foobar",
Key="the-key",
CopySource={"Bucket": "foobar", "Key": original_key_name},
CopySourceRange="0-3",
PartNumber=2,
UploadId=mpu["UploadId"],
)
s3_client.complete_multipart_upload(
2022-02-24 21:42:38 +00:00
Bucket="foobar",
Key="the-key",
MultipartUpload={
"Parts": [
{"ETag": up1["ETag"], "PartNumber": 1},
{"ETag": up2["CopyPartResult"]["ETag"], "PartNumber": 2},
]
},
UploadId=mpu["UploadId"],
)
response = s3_client.get_object(Bucket="foobar", Key="the-key")
assert response["Body"].read() == part1 + b"key_"
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_upload_cancel():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="foobar")
2022-02-24 21:42:38 +00:00
mpu = s3_client.create_multipart_upload(Bucket="foobar", Key="the-key")
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="foobar",
Key="the-key",
PartNumber=1,
UploadId=mpu["UploadId"],
Body=BytesIO(part1),
)
uploads = s3_client.list_multipart_uploads(Bucket="foobar")["Uploads"]
assert len(uploads) == 1
assert uploads[0]["Key"] == "the-key"
2022-02-24 21:42:38 +00:00
s3_client.abort_multipart_upload(
Bucket="foobar", Key="the-key", UploadId=mpu["UploadId"]
)
2022-02-24 21:42:38 +00:00
assert "Uploads" not in s3_client.list_multipart_uploads(Bucket="foobar")
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_etag_quotes_stripped():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="foobar")
s3_client.put_object(Bucket="foobar", Key="original-key", Body="key_value")
2022-02-24 21:42:38 +00:00
mpu = s3_client.create_multipart_upload(Bucket="foobar", Key="the-key")
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
up1 = s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="foobar",
Key="the-key",
PartNumber=1,
UploadId=mpu["UploadId"],
Body=BytesIO(part1),
)
etag1 = up1["ETag"].replace('"', "")
up2 = s3_client.upload_part_copy(
2022-02-24 21:42:38 +00:00
Bucket="foobar",
Key="the-key",
CopySource={"Bucket": "foobar", "Key": "original-key"},
CopySourceRange="0-3",
PartNumber=2,
UploadId=mpu["UploadId"],
)
etag2 = up2["CopyPartResult"]["ETag"].replace('"', "")
s3_client.complete_multipart_upload(
2022-02-24 21:42:38 +00:00
Bucket="foobar",
Key="the-key",
MultipartUpload={
"Parts": [
{"ETag": etag1, "PartNumber": 1},
{"ETag": etag2, "PartNumber": 2},
]
},
UploadId=mpu["UploadId"],
)
response = s3_client.get_object(Bucket="foobar", Key="the-key")
assert response["Body"].read() == part1 + b"key_"
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_duplicate_upload():
s3_resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
2022-02-24 21:42:38 +00:00
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_resource.create_bucket(Bucket="foobar")
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
part2 = b"1"
multipart = client.create_multipart_upload(Bucket="foobar", Key="the-key")
2022-02-24 21:42:38 +00:00
client.upload_part(
Body=BytesIO(part1),
PartNumber=1,
Bucket="foobar",
Key="the-key",
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
# same part again
up1 = client.upload_part(
Body=BytesIO(part1),
PartNumber=1,
Bucket="foobar",
Key="the-key",
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
up2 = client.upload_part(
Body=BytesIO(part2),
PartNumber=2,
Bucket="foobar",
Key="the-key",
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
client.complete_multipart_upload(
Bucket="foobar",
Key="the-key",
MultipartUpload={
"Parts": [
{"ETag": up1["ETag"], "PartNumber": 1},
{"ETag": up2["ETag"], "PartNumber": 2},
]
},
UploadId=multipart["UploadId"],
2022-02-24 21:42:38 +00:00
)
# we should get both parts as the key contents
response = client.get_object(Bucket="foobar", Key="the-key")
assert response["Body"].read() == part1 + part2
2022-02-24 21:42:38 +00:00
@mock_s3
def test_list_multiparts():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="foobar")
2022-02-24 21:42:38 +00:00
mpu1 = s3_client.create_multipart_upload(Bucket="foobar", Key="one-key")
mpu2 = s3_client.create_multipart_upload(Bucket="foobar", Key="two-key")
2022-02-24 21:42:38 +00:00
uploads = s3_client.list_multipart_uploads(Bucket="foobar")["Uploads"]
assert len(uploads) == 2
assert {u["Key"]: u["UploadId"] for u in uploads} == (
2022-02-24 21:42:38 +00:00
{"one-key": mpu1["UploadId"], "two-key": mpu2["UploadId"]}
)
s3_client.abort_multipart_upload(
Bucket="foobar", Key="the-key", UploadId=mpu2["UploadId"]
)
2022-02-24 21:42:38 +00:00
uploads = s3_client.list_multipart_uploads(Bucket="foobar")["Uploads"]
assert len(uploads) == 1
assert uploads[0]["Key"] == "one-key"
2022-02-24 21:42:38 +00:00
s3_client.abort_multipart_upload(
Bucket="foobar", Key="the-key", UploadId=mpu1["UploadId"]
)
2022-02-24 21:42:38 +00:00
res = s3_client.list_multipart_uploads(Bucket="foobar")
assert "Uploads" not in res
@mock_s3
def test_multipart_should_throw_nosuchupload_if_there_are_no_parts():
bucket = boto3.resource("s3", region_name=DEFAULT_REGION_NAME).Bucket(
"randombucketname"
)
bucket.create()
s3_object = bucket.Object("my/test2")
multipart_upload = s3_object.initiate_multipart_upload()
multipart_upload.abort()
with pytest.raises(ClientError) as ex:
list(multipart_upload.parts.all())
err = ex.value.response["Error"]
assert err["Code"] == "NoSuchUpload"
assert err["Message"] == (
"The specified upload does not exist. The upload ID may be invalid, "
"or the upload may have been aborted or completed."
)
assert err["UploadId"] == multipart_upload.id
@mock_s3
2022-02-24 21:42:38 +00:00
def test_multipart_wrong_partnumber():
bucket_name = "mputest-3593"
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket=bucket_name)
mpu = s3_client.create_multipart_upload(Bucket=bucket_name, Key="the-key")
mpu_id = mpu["UploadId"]
body = b"111"
with pytest.raises(ClientError) as ex:
s3_client.upload_part(
Bucket=bucket_name,
Key="the-key",
PartNumber=-1,
UploadId=mpu_id,
Body=body,
ContentLength=len(body),
)
err = ex.value.response["Error"]
assert err["Code"] == "NoSuchUpload"
assert err["Message"] == (
"The specified upload does not exist. The upload ID may be invalid, "
"or the upload may have been aborted or completed."
)
@mock_s3
def test_multipart_upload_with_tags():
bucket = "mybucket"
key = "test/multipartuploadtag/file.txt"
tags = "a=b"
client = boto3.client("s3", region_name="us-east-1")
client.create_bucket(Bucket=bucket)
response = client.create_multipart_upload(Bucket=bucket, Key=key, Tagging=tags)
upload = boto3.resource("s3").MultipartUpload(bucket, key, response["UploadId"])
parts = [
{
"ETag": upload.Part(i).upload(Body=os.urandom(5 * (2**20)))["ETag"],
"PartNumber": i,
}
for i in range(1, 3)
]
upload.complete(MultipartUpload={"Parts": parts})
# check tags
response = client.get_object_tagging(Bucket=bucket, Key=key)
actual = {t["Key"]: t["Value"] for t in response.get("TagSet", [])}
assert actual == {"a": "b"}
@mock_s3
def test_multipart_upload_should_return_part_10000():
bucket = "dummybucket"
s3_client = boto3.client("s3", "us-east-1")
key = "test_file"
s3_client.create_bucket(Bucket=bucket)
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
mpu_id = mpu["UploadId"]
s3_client.upload_part(
Bucket=bucket, Key=key, PartNumber=1, UploadId=mpu_id, Body="data"
)
s3_client.upload_part(
Bucket=bucket, Key=key, PartNumber=2, UploadId=mpu_id, Body="data"
)
s3_client.upload_part(
Bucket=bucket, Key=key, PartNumber=10000, UploadId=mpu_id, Body="data"
)
all_parts = s3_client.list_parts(Bucket=bucket, Key=key, UploadId=mpu_id)["Parts"]
part_nrs = [part["PartNumber"] for part in all_parts]
assert part_nrs == [1, 2, 10000]
@mock_s3
def test_multipart_upload_without_parts():
bucket = "dummybucket"
s3_client = boto3.client("s3", "us-east-1")
key = "test_file"
s3_client.create_bucket(Bucket=bucket)
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
mpu_id = mpu["UploadId"]
list_parts_result = s3_client.list_parts(Bucket=bucket, Key=key, UploadId=mpu_id)
assert list_parts_result["IsTruncated"] is False
@mock_s3
@pytest.mark.parametrize("part_nr", [10001, 10002, 20000])
def test_s3_multipart_upload_cannot_upload_part_over_10000(part_nr):
bucket = "dummy"
s3_client = boto3.client("s3", "us-east-1")
key = "test_file"
s3_client.create_bucket(Bucket=bucket)
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
mpu_id = mpu["UploadId"]
with pytest.raises(ClientError) as exc:
s3_client.upload_part(
Bucket=bucket, Key=key, PartNumber=part_nr, UploadId=mpu_id, Body="data"
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidArgument"
assert err["Message"] == (
"Part number must be an integer between 1 and 10000, inclusive"
)
assert err["ArgumentName"] == "partNumber"
assert err["ArgumentValue"] == f"{part_nr}"
2022-02-24 21:42:38 +00:00
@mock_s3
def test_s3_abort_multipart_data_with_invalid_upload_and_key():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket="blah")
with pytest.raises(ClientError) as exc:
2022-02-24 21:42:38 +00:00
client.abort_multipart_upload(
Bucket="blah", Key="foobar", UploadId="dummy_upload_id"
)
err = exc.value.response["Error"]
assert err["Code"] == "NoSuchUpload"
assert err["Message"] == (
"The specified upload does not exist. The upload ID may be invalid, "
"or the upload may have been aborted or completed."
2022-02-24 21:42:38 +00:00
)
assert err["UploadId"] == "dummy_upload_id"
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_etag():
# Create Bucket so that test can run
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
2022-02-24 21:42:38 +00:00
upload_id = s3_client.create_multipart_upload(Bucket="mybucket", Key="the-key")[
"UploadId"
]
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
etags = []
etags.append(
s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
PartNumber=1,
UploadId=upload_id,
Body=part1,
)["ETag"]
)
# last part, can be less than 5 MB
part2 = b"1"
etags.append(
s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
PartNumber=2,
UploadId=upload_id,
Body=part2,
)["ETag"]
)
s3_client.complete_multipart_upload(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
UploadId=upload_id,
MultipartUpload={
"Parts": [
{"ETag": etag, "PartNumber": i} for i, etag in enumerate(etags, 1)
]
},
)
# we should get both parts as the key contents
resp = s3_client.get_object(Bucket="mybucket", Key="the-key")
assert resp["ETag"] == EXPECTED_ETAG
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_version():
# Create Bucket so that test can run
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
2022-02-24 21:42:38 +00:00
s3_client.put_bucket_versioning(
2022-02-24 21:42:38 +00:00
Bucket="mybucket", VersioningConfiguration={"Status": "Enabled"}
)
upload_id = s3_client.create_multipart_upload(Bucket="mybucket", Key="the-key")[
"UploadId"
]
2022-02-24 21:42:38 +00:00
part1 = b"0" * REDUCED_PART_SIZE
etags = []
etags.append(
s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
PartNumber=1,
UploadId=upload_id,
Body=part1,
)["ETag"]
)
# last part, can be less than 5 MB
part2 = b"1"
etags.append(
s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
PartNumber=2,
UploadId=upload_id,
Body=part2,
)["ETag"]
)
response = s3_client.complete_multipart_upload(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
UploadId=upload_id,
MultipartUpload={
"Parts": [
{"ETag": etag, "PartNumber": i} for i, etag in enumerate(etags, 1)
]
},
)
assert re.match("[-a-z0-9]+", response["VersionId"])
2022-02-24 21:42:38 +00:00
@mock_s3
@pytest.mark.parametrize(
"part_nr,msg,msg2",
[
(
-42,
"Argument max-parts must be an integer between 0 and 2147483647",
"Argument part-number-marker must be an integer between 0 and 2147483647",
),
(
2147483647 + 42,
"Provided max-parts not an integer or within integer range",
"Provided part-number-marker not an integer or within integer range",
),
],
)
def test_multipart_list_parts_invalid_argument(part_nr, msg, msg2):
s3_client = boto3.client("s3", region_name="us-east-1")
2022-02-24 21:42:38 +00:00
bucket_name = "mybucketasdfljoqwerasdfas"
s3_client.create_bucket(Bucket=bucket_name)
2022-02-24 21:42:38 +00:00
mpu = s3_client.create_multipart_upload(Bucket=bucket_name, Key="the-key")
2022-02-24 21:42:38 +00:00
mpu_id = mpu["UploadId"]
def get_parts(**kwarg):
s3_client.list_parts(
Bucket=bucket_name, Key="the-key", UploadId=mpu_id, **kwarg
)
2022-02-24 21:42:38 +00:00
with pytest.raises(ClientError) as err:
get_parts(**{"MaxParts": part_nr})
err_rsp = err.value.response["Error"]
assert err_rsp["Code"] == "InvalidArgument"
assert err_rsp["Message"] == msg
2022-02-24 21:42:38 +00:00
with pytest.raises(ClientError) as err:
get_parts(**{"PartNumberMarker": part_nr})
err_rsp = err.value.response["Error"]
assert err_rsp["Code"] == "InvalidArgument"
assert err_rsp["Message"] == msg2
2022-02-24 21:42:38 +00:00
@mock_s3
@reduced_min_part_size
def test_multipart_list_parts():
s3_client = boto3.client("s3", region_name="us-east-1")
2022-02-24 21:42:38 +00:00
bucket_name = "mybucketasdfljoqwerasdfas"
s3_client.create_bucket(Bucket=bucket_name)
2022-02-24 21:42:38 +00:00
mpu = s3_client.create_multipart_upload(Bucket=bucket_name, Key="the-key")
2022-02-24 21:42:38 +00:00
mpu_id = mpu["UploadId"]
parts = []
n_parts = 10
def get_parts_all(i):
# Get uploaded parts using default values
uploaded_parts = []
uploaded = s3_client.list_parts(
Bucket=bucket_name, Key="the-key", UploadId=mpu_id
)
2022-02-24 21:42:38 +00:00
assert uploaded["PartNumberMarker"] == 0
# Parts content check
if i > 0:
for part in uploaded["Parts"]:
uploaded_parts.append(
{"ETag": part["ETag"], "PartNumber": part["PartNumber"]}
)
assert uploaded_parts == parts
next_part_number_marker = uploaded["Parts"][-1]["PartNumber"]
else:
next_part_number_marker = 0
assert uploaded["NextPartNumberMarker"] == next_part_number_marker
assert not uploaded["IsTruncated"]
def get_parts_by_batch(i):
# Get uploaded parts by batch of 2
part_number_marker = 0
uploaded_parts = []
while "there are parts":
uploaded = s3_client.list_parts(
2022-02-24 21:42:38 +00:00
Bucket=bucket_name,
Key="the-key",
UploadId=mpu_id,
PartNumberMarker=part_number_marker,
MaxParts=2,
)
assert uploaded["PartNumberMarker"] == part_number_marker
if i > 0:
# We should received maximum 2 parts
assert len(uploaded["Parts"]) <= 2
# Store parts content for the final check
for part in uploaded["Parts"]:
uploaded_parts.append(
{"ETag": part["ETag"], "PartNumber": part["PartNumber"]}
)
# No more parts, get out the loop
if not uploaded["IsTruncated"]:
break
# Next parts batch will start with that number
part_number_marker = uploaded["NextPartNumberMarker"]
assert part_number_marker == i + 1 if len(parts) > i else i
# Final check: we received all uploaded parts
assert uploaded_parts == parts
# Check ListParts API parameters when no part was uploaded
get_parts_all(0)
get_parts_by_batch(0)
for i in range(1, n_parts + 1):
part_size = REDUCED_PART_SIZE + i
body = b"1" * part_size
part = s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket=bucket_name,
Key="the-key",
PartNumber=i,
UploadId=mpu_id,
Body=body,
ContentLength=len(body),
)
parts.append({"PartNumber": i, "ETag": part["ETag"]})
# Check ListParts API parameters while there are uploaded parts
get_parts_all(i)
get_parts_by_batch(i)
# Check ListParts API parameters when all parts were uploaded
get_parts_all(11)
get_parts_by_batch(11)
s3_client.complete_multipart_upload(
2022-02-24 21:42:38 +00:00
Bucket=bucket_name,
Key="the-key",
UploadId=mpu_id,
MultipartUpload={"Parts": parts},
)
@mock_s3
@reduced_min_part_size
def test_multipart_part_size():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3_client.create_bucket(Bucket="mybucket")
2022-02-24 21:42:38 +00:00
mpu = s3_client.create_multipart_upload(Bucket="mybucket", Key="the-key")
2022-02-24 21:42:38 +00:00
mpu_id = mpu["UploadId"]
parts = []
n_parts = 10
for i in range(1, n_parts + 1):
part_size = REDUCED_PART_SIZE + i
body = b"1" * part_size
part = s3_client.upload_part(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
PartNumber=i,
UploadId=mpu_id,
Body=body,
ContentLength=len(body),
)
parts.append({"PartNumber": i, "ETag": part["ETag"]})
s3_client.complete_multipart_upload(
2022-02-24 21:42:38 +00:00
Bucket="mybucket",
Key="the-key",
UploadId=mpu_id,
MultipartUpload={"Parts": parts},
)
for i in range(1, n_parts + 1):
obj = s3_client.head_object(Bucket="mybucket", Key="the-key", PartNumber=i)
2022-02-24 21:42:38 +00:00
assert obj["ContentLength"] == REDUCED_PART_SIZE + i
@mock_s3
def test_complete_multipart_with_empty_partlist():
"""Verify InvalidXML-error sent for MultipartUpload with empty part list."""
bucket = "testbucketthatcompletesmultipartuploadwithoutparts"
key = "test-multi-empty"
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket=bucket)
2022-03-10 14:39:59 +00:00
response = client.create_multipart_upload(Bucket=bucket, Key=key)
uid = response["UploadId"]
upload = boto3.resource("s3").MultipartUpload(bucket, key, uid)
with pytest.raises(ClientError) as exc:
upload.complete(MultipartUpload={"Parts": []})
err = exc.value.response["Error"]
assert err["Code"] == "MalformedXML"
assert err["Message"] == (
"The XML you provided was not well-formed or did not validate "
"against our published schema"
)
@mock_s3
def test_ssm_key_headers_in_create_multipart():
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "ssm-headers-bucket"
s3_client.create_bucket(Bucket=bucket_name)
kms_key_id = "random-id"
key_name = "test-file.txt"
create_multipart_response = s3_client.create_multipart_upload(
Bucket=bucket_name,
Key=key_name,
ServerSideEncryption="aws:kms",
SSEKMSKeyId=kms_key_id,
)
assert create_multipart_response["ServerSideEncryption"] == "aws:kms"
assert create_multipart_response["SSEKMSKeyId"] == kms_key_id
upload_part_response = s3_client.upload_part(
Body=b"bytes",
Bucket=bucket_name,
Key=key_name,
PartNumber=1,
UploadId=create_multipart_response["UploadId"],
)
assert upload_part_response["ServerSideEncryption"] == "aws:kms"
assert upload_part_response["SSEKMSKeyId"] == kms_key_id
parts = {"Parts": [{"PartNumber": 1, "ETag": upload_part_response["ETag"]}]}
complete_multipart_response = s3_client.complete_multipart_upload(
Bucket=bucket_name,
Key=key_name,
UploadId=create_multipart_response["UploadId"],
MultipartUpload=parts,
)
assert complete_multipart_response["ServerSideEncryption"] == "aws:kms"
assert complete_multipart_response["SSEKMSKeyId"] == kms_key_id
@mock_s3
@reduced_min_part_size
def test_generate_presigned_url_on_multipart_upload_without_acl():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "testing"
client.create_bucket(Bucket=bucket_name)
object_key = "test_multipart_object"
multipart_response = client.create_multipart_upload(
Bucket=bucket_name, Key=object_key
)
upload_id = multipart_response["UploadId"]
parts = []
n_parts = 10
for i in range(1, n_parts + 1):
part_size = REDUCED_PART_SIZE + i
body = b"1" * part_size
part = client.upload_part(
Bucket=bucket_name,
Key=object_key,
PartNumber=i,
UploadId=upload_id,
Body=body,
ContentLength=len(body),
)
parts.append({"PartNumber": i, "ETag": part["ETag"]})
client.complete_multipart_upload(
Bucket=bucket_name,
Key=object_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
url = client.generate_presigned_url(
"head_object", Params={"Bucket": bucket_name, "Key": object_key}
)
2023-09-27 18:34:30 +00:00
kwargs = {}
if test_proxy_mode():
add_proxy_details(kwargs)
res = requests.get(url, **kwargs)
assert res.status_code == 200
@mock_s3
@reduced_min_part_size
def test_head_object_returns_part_count():
bucket = "telstra-energy-test"
key = "test-single-multi-part"
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
client.create_bucket(Bucket=bucket)
mp_id = client.create_multipart_upload(Bucket=bucket, Key=key)["UploadId"]
num_parts = 2
parts = []
for part in range(1, num_parts + 1):
response = client.upload_part(
Body=b"x" * (REDUCED_PART_SIZE + part),
Bucket=bucket,
Key=key,
PartNumber=part,
UploadId=mp_id,
)
parts.append({"ETag": response["ETag"], "PartNumber": part})
client.complete_multipart_upload(
Bucket=bucket,
Key=key,
MultipartUpload={"Parts": parts},
UploadId=mp_id,
)
resp = client.head_object(Bucket=bucket, Key=key, PartNumber=1)
assert resp["PartsCount"] == num_parts
# Header is not returned when we do not pass PartNumber
resp = client.head_object(Bucket=bucket, Key=key)
assert "PartsCount" not in resp
@mock_s3
@reduced_min_part_size
def test_generate_presigned_url_for_multipart_upload():
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("No point in testing this outside decorator mode")
bucket_name = "mock-bucket"
file_name = "mock-file"
s3_client = boto3.client("s3")
s3_client.create_bucket(Bucket=bucket_name)
mpu = s3_client.create_multipart_upload(
Bucket=bucket_name,
Key=file_name,
)
upload_id = mpu["UploadId"]
url = s3_client.generate_presigned_url(
"upload_part",
Params={
"Bucket": bucket_name,
"Key": file_name,
"PartNumber": 1,
"UploadId": upload_id,
},
)
data = b"0" * REDUCED_PART_SIZE
resp = requests.put(url, data=data)
assert resp.status_code == 200